Conversation
… fix attribute and value structType across children elements
…ma to execute; add bernoulli sampling for samplingRatio and tests
| return True | ||
|
|
||
|
|
||
| def _validate_row_for_type_mismatch( |
There was a problem hiding this comment.
Is this function being called per row after each call of element_to_dict_or_str ? I am a little concerning that whether this is going to impact the performance. Do you think it is possible to put this logic into element_to_dict_or_str so that we don't have to traverse each element of the row again?
There was a problem hiding this comment.
Discussed offline, _validate_row_for_type_mismatch is O(n) where n is the number of schema fields/columns; whereas element_to_dict_or_str traverses every element recursively. Thus, _validate_row_for_type_mismatch's performance impact is minimal compared it.
From contextual point of view, _validate_row_for_type_mismatch needs to validate against the resulting transformed dict, so it'd be better to semantically and sequentially separate these functions.
| if corrupt_col_name in df_columns: | ||
| corrupt_ref = df[single_quote(corrupt_col_name)] | ||
| cols.append( | ||
| corrupt_ref.cast(StringType()).alias( |
There was a problem hiding this comment.
what's the type for the column _corrupt_record? -- I'm wondering 1) if we need cast 2) if cast is always needed can the string conversion logic be moved into the UDTF?
There was a problem hiding this comment.
_corrupt_record will always be stringType, and xml_reader.py UDTF always returns VARIANT for all columns including _corrupt_record. If user wants XML to return typed columns (guarded by if effective_schema is not None), the cast has to happen in dataframe_reader.py, can't be moved into UDTF.
| def _resolve_xml_file_for_udtf(self, local_file_path: str) -> str: | ||
| """Return the UDTF file path, uploading to a temp stage in stored procedures.""" | ||
| if is_in_stored_procedure(): # pragma: no cover | ||
| temp_stage = random_name_for_temp_object(TempObjectType.STAGE) |
There was a problem hiding this comment.
why we need an upload here? is it because the server doesn't have the latest implementation?
There was a problem hiding this comment.
The upload is for sproc execution, which needs the UDTF file to be on stage for the UDTF register_from_file call to work later. Local execution does not need file upload.
| sql_create_temp_stage = ( | ||
| f"create temp stage if not exists {temp_stage} {XML_READER_SQL_COMMENT}" | ||
| ) | ||
| self._session.sql(sql_create_temp_stage, _emit_ast=False).collect( |
There was a problem hiding this comment.
snowpark session has a get_session_stage method which creates a temp stage for the session, are we able to reuse that?
There was a problem hiding this comment.
Yes, we can, I can reuse it in the next commit
| except IndexError: | ||
| raise ValueError(f"{path} does not exist") | ||
|
|
||
| num_workers = min(16, file_size // DEFAULT_CHUNK_SIZE + 1) |
There was a problem hiding this comment.
is the num_workers config inheriting from the ingestion logic?
There was a problem hiding this comment.
No, it's XML-specific. Previous perf benchmark showed UDTF parallelism plateau at 16 workers, so this is inspired by the original Snowpark UDTF design here
| partial_schema = type_string_to_type_object(schema_str) | ||
| except Exception: | ||
| continue | ||
| if not isinstance(partial_schema, StructType): |
There was a problem hiding this comment.
what's the case that a partial_schema is returned?
There was a problem hiding this comment.
results will contain up to 16 rows, each mapping to the schema inferred from uniformly split byte range in file. partial_schema captures the schema from each of the 16 workers, and individually merge/widen the previously merged_schema.
If you were wondering when the exception happens -- realistically the exception should never fire, since xml_schema_inference.py handles types in a tight roundtrip (_case_preserving_simple_string and type_string_to_type_object). This is a defensive guard to ensure one bad worker result doesn't crash the entire schema inference flow.
| def _can_cast_to_type(value: str, target_type: DataType) -> bool: | ||
| if isinstance(target_type, StringType): | ||
| return True | ||
| if isinstance(target_type, LongType): |
There was a problem hiding this comment.
does this cover all types -- I don't see decimal type here, so does it mean we don't need to handle the decimal type here
There was a problem hiding this comment.
Right, Spark by default doesn't infer decimal for XML. Spark's DecimalType inference is gated behind options.prefersDecimal, which defaults to false.
case v if options.prefersDecimal && decimalTry.isDefined => decimalTry.get
| column_name_of_corrupt_record, | ||
| ) | ||
|
|
||
| if row is not None: |
There was a problem hiding this comment.
I want to confirm
- the behavior in spark that a mismatch row is skipped instead of being presented as a null row?
- does it change the current snowpark behavior?
There was a problem hiding this comment.
- Only when mode=DROPMALFORMED, the row is skipped. When mode=PERMISSIVE, the bad fields are nullified and added to
_corrupt_record. When mode=FAILFAST, raises an error._validate_row_for_type_mismatchhandles these behaviors upon checkingmode - Original behavior without inferSchema or custom schema is preserved, since no VARIANT casting was needed. This does not change custom schema behavior, but prevent custom schema from raising SQL casting errors where it shouldn't.
| ignore_surrounding_whitespace, | ||
| row_validation_xsd_path=row_validation_xsd_path, | ||
| result_template=result_template, | ||
| schema_type=schema_type, |
There was a problem hiding this comment.
does this change the existing behavior in snowpark?
previously we don't do _validate_row_for_type_mismatch, is it always doing _validate_row_for_type_mismatch now?
There was a problem hiding this comment.
_validate_row_for_type_mismatch is guarded by if schema_type is not None, which only applies to custom schema and infer schema cases. It enhances custom schema behavior, and leaves the original default Variant-returning xml parser behaviors unchanged.
This fixes an issue that would previously throw exception for custom schema and now infer schema. If a schema that mismatches input data type is provided, original behavior would throw SQL exception failure to cast.
| return True | ||
| except (ValueError, TypeError): | ||
| return False | ||
| if isinstance(target_type, TimestampType): |
There was a problem hiding this comment.
currently, this function will return True when the target type is not in the check(not StringType, LongType, DoubleType ....)
For example, if the target type is TimeType, this function always return True, is this expected?
Which Jira issue is this PR addressing? Make sure that there is an accompanying issue to your PR.
Fixes SNOW-3192256
Fill out the following pre-review checklist:
Please describe how your code solves the related issue.
Please write a short description of how your code change solves the related issue.
Published PR to trigger merge gate for test results, but this PR is still a WIP. Will add google doc link to design doc.