refactor: Make CDF use TableConfiguration and refactor log replay#645
refactor: Make CDF use TableConfiguration and refactor log replay#645OussamaSaoudi wants to merge 4 commits into
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #645 +/- ##
==========================================
+ Coverage 84.78% 84.83% +0.04%
==========================================
Files 88 88
Lines 22605 22679 +74
Branches 22605 22679 +74
==========================================
+ Hits 19166 19239 +73
+ Misses 2459 2458 -1
- Partials 980 982 +2 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
774fd50 to
23f4b6f
Compare
| configuration: HashMap::from([ | ||
| ("delta.enableChangeDataFeed".to_string(), "true".to_string()), | ||
| ( | ||
| "delta.enableDeletionVectors".to_string(), | ||
| "true".to_string(), | ||
| ), | ||
| ("delta.columnMapping.mode".to_string(), "none".to_string()), |
There was a problem hiding this comment.
aside: I wish rust had better (more transparent) handling of String vs. &str... code like this gets so ugly and unwieldy.
There was a problem hiding this comment.
yep the to_string everywhere is kind of gross, but at least we're aware of where the allocations are happening
|
|
||
| fn table_config() -> TableConfiguration { | ||
| let schema_string = serde_json::to_string(&get_schema()).unwrap(); | ||
| let metadata = Metadata { |
There was a problem hiding this comment.
Out of curiosity, why is Metadata infallible while Protocol is fallible? Seems like any number of things could go wrong with it, such as partition column names that aren't actually in the schema?
There was a problem hiding this comment.
Part of me wonders if we should make a clean break between P&M as "plain old data" vs. table configuration as the one stop shop for validation of that raw data? So then, any time you see a P or M in isolation, you have to assume it's broken in arbitrary ways. It's only "for sure" self-consistent and valid if a TC successfully wrapped it.
There was a problem hiding this comment.
I think Metadata just doesn't have a constructor for kernel/rust data that does these checks. All it has is pub fn try_new_from_data(data: &dyn EngineData)
There was a problem hiding this comment.
Part of me wonders if we should make a clean break between P&M as "plain old data" vs. table configuration as the one stop shop for validation of that raw data?
Ohhh so proposing that we move the validation that we do in Protocol::try_new to TC? Then this Metadata validation could also live there.
Moreover if we think of protocol as just "raw, unchecked data", I'm starting to wonder if we should move ensure_read_supported, has_writer_feature and has_reader_feature to TC?
There was a problem hiding this comment.
Yeah... TC becomes the logical heart of the table, with P&M as simple raw information underneath
There was a problem hiding this comment.
Note for anyone reading this thread, I'm tracking table config stuff in #650
| Some([ReaderFeatures::DeletionVectors]), | ||
| Some([ReaderFeatures::ColumnMapping]), |
There was a problem hiding this comment.
Should one of those be WriterFeatures? Also, the spec says:
Reader features should be listed in both
readerFeaturesandwriterFeaturessimultaneously, while writer features should be listed only inwriterFeatures. It is not allowed to list a feature only inreaderFeaturesbut not inwriterFeatures.
There was a problem hiding this comment.
This seems to be the case for all reader features. If that's the case, then ensure_read_supported should be checking that the reader features exist in both the reader and and the writer features list for a given protocol.
| // Note: We do not perform data skipping yet because we need to visit all add and | ||
| // remove actions for deletion vector resolution to be correct. | ||
| // | ||
| // Consider a scenario with a pair of add/remove actions with the same path. The add |
95cac9c to
5474769
Compare
| Some::<Vec<String>>(vec![]), | ||
| Some::<Vec<String>>(vec![]), |
There was a problem hiding this comment.
Note: Creating Protocol from None is actually pretty challenging since it doesn't know the type of T in Option::<T>::None. Also, the type try_new takes for reader/writer features is Option<impl IntoIterator<Item = impl Into<String>>> shouldn't we be using impl ToString?
There was a problem hiding this comment.
Into<String> and ToString have different characteristics, especially wrt String:
Into<String> for Stringis a moveToString for Stringmakes a copy- More types impl
ToStringthan implInto<String>
Dunno which we "should" use here...
There was a problem hiding this comment.
Hmm i guess we should prefer moves. Though this is affected by the proposed changes to move all logic to TableConfiguration, so I'll leave this for now and address it in #650
5474769 to
436852b
Compare
| let string_list: DataType = ArrayType::new(STRING, false).into(); | ||
| let string_string_map = MapType::new(STRING, STRING, false).into(); | ||
| let str_list: DataType = ArrayType::new(STRING, false).into(); | ||
| let str_str_map: DataType = MapType::new(STRING, STRING, false).into(); |
There was a problem hiding this comment.
Note: Renaming so that the lines below don't get too long.
scovich
left a comment
There was a problem hiding this comment.
LGTM, except one possible latent bug to double-check
| // The only (path, deletion_vector) pairs we must track are ones whose path is the | ||
| // same as an `add` action. | ||
| remove_dvs.retain(|rm_path, _| add_paths.contains(rm_path)); | ||
| if has_metadata { |
There was a problem hiding this comment.
Maybe this makes it more self-describing?
| if has_metadata { | |
| if metadata_changed { |
There was a problem hiding this comment.
Would this be better as this?
require!(
!metadata_changed || table_schema.as_ref() == table_configuration.schema(),
Error::change_data_feed_incompatible_schema(
table_schema,
table_configuration.schema()
)
);There was a problem hiding this comment.
Hmm this may encourage an expensive swap
table_schema.as_ref() == table_configuration.schema() || !metadata_changedI'll keep it using the if for now.
| getters[14].get_int(i, "protocol.min_reader_version")? | ||
| { | ||
| let protocol = | ||
| ProtocolVisitor::visit_protocol(i, min_reader_version, &getters[12..=15])?; | ||
| ProtocolVisitor::visit_protocol(i, min_reader_version, &getters[14..=17])?; |
There was a problem hiding this comment.
This looks wrong (existing bug)? Wouldn't we read [14] twice (or [12] previously)?
Or does it demand the full list even tho it won't use the first one?
There was a problem hiding this comment.
visit_protocol expects all fields of protocol to be passed in, even though it only reads elements 1,2, 3 (ignores 0).
visit_protocol:
require!(
getters.len() == 4,
Error::InternalError(format!(
"Wrong number of ProtocolVisitor getters: {}",
getters.len()
))
);
let min_writer_version: i32 = getters[1].get(row_index, "protocol.min_writer_version")?;
let reader_features: Option<Vec<_>> =
getters[2].get_opt(row_index, "protocol.reader_features")?;
let writer_features: Option<Vec<_>> =
getters[3].get_opt(row_index, "protocol.writer_features")?;
Protocol::try_new(
min_reader_version,
min_writer_version,
reader_features,
writer_features,
)I could change this, but maybe that could be another PR?
There was a problem hiding this comment.
No need -- I was just verifying that we didn't have a double-use of that field by mentioning it twice.
fae5a21 to
0ece561
Compare
552c5f0 to
7243c36
Compare
zachschuermann
left a comment
There was a problem hiding this comment.
few questions/comments otherwise LGTM!
| /// - `has_cdc_action` which is `true` when there is a `cdc` action present in the commit. This is | ||
| /// used in [`cdf_commit_to_scan_batches`] to correctly generate a selection vector over the actions | ||
| /// in the commit. | ||
| struct ProcessedCdfCommit { |
There was a problem hiding this comment.
organization nit: I think we could still have the 'constructor' abstraction and make process_cdf_commit a try_new and also make cdf_commit_to_scan_batches just into_scan_batches
that (to me) feels like a nice paradigm:
// do all the work to process a commit
let processed_commit = ProcessedCdfCommit::try_new(...)?;
// consume self and give back the scan batches
let scan_data_iter = processed_commit.into_scan_batches(...)?;EDIT: also now revisiting it looks like this is pretty much what we had before. any reason to make them static functions instead of those methods?
There was a problem hiding this comment.
Pros for new approach
Here are reasons I had in mind:
- I remember comments saying that the LogReplayScanner could use a refactor back when I was building it. Ryan mentioned that the internal structure was good for respecting rule of 30 and so on, but my impression is that it was a comment on the function size/responsibility rather than the api of LogReplayScanner.
- Mapping 2 functions makes the 2 phases rly clear. Also because each function does a single thing, I think the documentation got a lot simpler.
Cons
I was going to add that
in the future if we wanted to test this more thoroughly, you can test each phase individually. That was not the case before.
But then I realized: cdf_commit_to_scan_batches doesn't work without a consistent and correct ProcessedCdfCommit created by process_cdf_commit. There's a dependency between these two functions 🤔
Aside
As an aside, I have a sense that structs come in 2 flavors:
- "Just data". Ex: actions like
Protocol,SocketAddr,Url - Structs that need to maintain some internal consistency with its state. Ex:
TableConfiguration,BTreeMap,Snapshot.
maybe my assessment is wrong, but LogReplayScanner doesn't feel like it fully fits in 2. This might be the source of your initial apprehension in the comment above. It most definitely isn't in category 1.
There was a problem hiding this comment.
In any case, I'm happy to revert back to the original design :) I'd also like to get your take on this pattern for struts that I mentioned.
| id, | ||
| name, | ||
| description, | ||
| name: getters[1].get_opt(row_index, "metaData.name")?, |
There was a problem hiding this comment.
interesting i just noticed we had all lowercase before.. guess we are case-insensitive?
There was a problem hiding this comment.
seems like it. idk what the column name restrictions are in delta, but we should probably keep those.
There was a problem hiding this comment.
Delta can be case-sensitive, but those strings are only for error messages so it shouldn't matter.
| commit_files: impl IntoIterator<Item = ParsedLogPath>, | ||
| table_schema: SchemaRef, | ||
| physical_predicate: Option<(ExpressionRef, SchemaRef)>, | ||
| mut table_configuration: TableConfiguration, |
There was a problem hiding this comment.
I suppose this is okay since we are taking ownership of the TableConfiguration and just updating it as we go.. Though I wonder if there would be a better (read: less mutability) way to do updates to configuration leveraging Cow and some ability to construct new TableConfiguration incrementally? I don't have a proposal here, just that we may want to revisit this
There was a problem hiding this comment.
Cow prolly doesn't make sense in this scenario since I think that works better for pointer-based data (like graphs, linked lists, trees, etc). I think immutability is gonna take some funky iterator stuff, but I can revisit this in the future.
| *table_configuration = TableConfiguration::try_new( | ||
| metadata.unwrap_or_else(|| table_configuration.metadata().clone()), | ||
| protocol.unwrap_or_else(|| table_configuration.protocol().clone()), | ||
| table_configuration.table_root().clone(), | ||
| commit_file.version, | ||
| )?; |
There was a problem hiding this comment.
related to comment above - this feels like the need for an API that will allow us to do better here instead of cloning
| table_schema.as_ref() == table_configuration.schema(), | ||
| Error::change_data_feed_incompatible_schema( | ||
| table_schema, | ||
| table_configuration.schema() | ||
| ) |
There was a problem hiding this comment.
and will this evolve to use the new schema resolution soon?
There was a problem hiding this comment.
Yep schema evolution is another PR 👍
| log_segment, | ||
| start_version, | ||
| schema, | ||
| start_table_config: start_snapshot.table_configuration().clone(), |
There was a problem hiding this comment.
can we avoid the clone? aren't we throwing away the start_snapshot? oh i guess if this returns ref we need to clone.. hm maybe there's a case for some into_config conversions etc.? (thinking out loud - not to solve here)
There was a problem hiding this comment.
Yeah start_snapshot is Arc, so we probably can't take.
0a7c859 to
812d445
Compare
| id, | ||
| name, | ||
| description, | ||
| name: getters[1].get_opt(row_index, "metaData.name")?, |
There was a problem hiding this comment.
Delta can be case-sensitive, but those strings are only for error messages so it shouldn't matter.
| self.metadata = Some(Metadata { | ||
| id, | ||
| schema_string: getters[11].get(i, "metaData.schemaString")?, | ||
| partition_columns: getters[12].get(i, "metaData.partitionColumns")?, | ||
| configuration: configuration_map_opt.unwrap_or_else(HashMap::new), |
There was a problem hiding this comment.
Hmm the idea was to project out all the other columns in Metadata that we don't care about for CDF. Do you reckon it's better to just visit the entire Metadata struct?
There was a problem hiding this comment.
I see... didn't realize we were pruning columns here.
|
|
||
| /// Ensures that change data feed is enabled in `table_properties`. See the documentation | ||
| /// of [`TableChanges`] for more details. | ||
| fn check_cdf_table_properties(table_properties: &TableProperties) -> DeltaResult<()> { |
There was a problem hiding this comment.
Maybe I'm missing something, but what code replaces check_cdf_table_properties and ensure_cdf_read_supported? Were these duplicating existing TableConfiguration code?
There was a problem hiding this comment.
Yep these were essentially duplicated when I first created TableConfiguration. The idea was to keep that PR small by doing the move in two steps: 1) implement in tableconfig, 2) swap out implementations in CDF
5380643 to
eec0cfc
Compare

What changes are proposed in this pull request?
This PR replaces the old CDF protocol and metadata checks to use
TableConfiguration. This paves the way for checking deletion vector enablement, in-commit timestamp enablement, and verifying column mapping support in the future.This PR also refactors LogReplay to clarify naming.
How was this change tested?
This is a refactor. All existing tests pass.