Skip to content
Open
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

#### New Features

- Added support for the `INCLUDE_METADATA` copy option in `DataFrame.copy_into_table`, allowing users to include file metadata columns in the target table.

#### Bug Fixes

- Fixed a bug in `Session.client_telemetry` that trace does not have snowflake style trace id.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1357,11 +1357,13 @@ def file_operation_statement(


def convert_value_to_sql_option(
value: Optional[Union[str, bool, int, float, list, tuple]],
value: Optional[Union[str, bool, int, float, list, tuple, dict]],
parse_none_as_string: bool = False,
) -> str:
if value is None and parse_none_as_string:
value = str(value)
if isinstance(value, dict):
return f"({', '.join(f'{k} = {v}' for k, v in value.items())})"
if isinstance(value, str):
if len(value) > 1 and is_single_quoted(value):
return value
Expand Down
1 change: 1 addition & 0 deletions src/snowflake/snowpark/_internal/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@
"TRUNCATECOLUMNS",
"FORCE",
"LOAD_UNCERTAIN_FILES",
"INCLUDE_METADATA",
}

COPY_INTO_LOCATION_COPY_OPTIONS = {
Expand Down
28 changes: 27 additions & 1 deletion src/snowflake/snowpark/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,12 @@
track_data_source_statement_params,
)
from snowflake.snowpark.async_job import AsyncJob, _AsyncResultType
from snowflake.snowpark.column import Column, _to_col_if_sql_expr, _to_col_if_str
from snowflake.snowpark.column import (
METADATA_COLUMN_TYPES,
Column,
_to_col_if_sql_expr,
_to_col_if_str,
)
from snowflake.snowpark.dataframe_ai_functions import DataFrameAIFunctions
from snowflake.snowpark.dataframe_analytics_functions import DataFrameAnalyticsFunctions
from snowflake.snowpark.dataframe_na_functions import DataFrameNaFunctions
Expand Down Expand Up @@ -4865,6 +4870,27 @@ def copy_into_table(
else None
)
copy_options = copy_options or reader_copy_options

if copy_options.get("INCLUDE_METADATA", None) is not None:
for metadata_col in copy_options["INCLUDE_METADATA"].values():
if quote_name(metadata_col.upper()) not in METADATA_COLUMN_TYPES:
raise ValueError(
f"Metadata column {metadata_col} is not supported. Supported columns: {list(METADATA_COLUMN_TYPES.keys())}"
)
if "MATCH_BY_COLUMN_NAME" not in copy_options:
raise ValueError(
"INCLUDE_METADATA can only be used with the MATCH_BY_COLUMN_NAME copy option."
)
if self._reader._file_type and self._reader._file_type.upper() == "CSV":
format_type_options = (
format_type_options.copy() if format_type_options else {}
)
if format_type_options.get("ERROR_ON_COLUMN_COUNT_MISMATCH", False):
raise ValueError(
"ERROR_ON_COLUMN_COUNT_MISMATCH must be False when INCLUDE_METADATA is used with CSV files."
)
format_type_options["ERROR_ON_COLUMN_COUNT_MISMATCH"] = False

validation_mode = validation_mode or self._reader._cur_options.get(
"VALIDATION_MODE"
)
Expand Down
97 changes: 82 additions & 15 deletions tests/ast/data/DataFrame.create_or_replace.test
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ df.copy_into_table(
statement_params={"foo": "bar"},
iceberg_config={"external_volume": "example_volume", "partition_by": [bucket(10, "n"), truncate(2, col("str"))], "target_file_size": "128MB", "catalog": "my_catalog"},
force=True,
INCLUDE_METADATA={"filename_col": "METADATA$FILENAME", "row_num_col": "METADATA$FILE_ROW_NUMBER"},
)

df3 = df.cache_result()
Expand All @@ -47,7 +48,7 @@ res3 = df.create_or_replace_temp_view(["test_db", "test_schema", "test_view"], c

res4 = df.create_or_replace_temp_view("test_view", statement_params={"foo": "bar"}, copy_grants=True)

df.copy_into_table(["test_db", "test_schema", "table2"], files=["file1", "file2"], pattern="[A-Z]+", validation_mode="RETURN_ERRORS", target_columns=["n", "str"], transformations=[col("n") * 10, col("str")], format_type_options={"COMPRESSION": "GZIP", "RECORD_DELIMITER": "|"}, statement_params={"foo": "bar"}, force=True, iceberg_config={"external_volume": "example_volume", "partition_by": [bucket(10, "n"), truncate(2, col("str"))], "target_file_size": "128MB", "catalog": "my_catalog"})
df.copy_into_table(["test_db", "test_schema", "table2"], files=["file1", "file2"], pattern="[A-Z]+", validation_mode="RETURN_ERRORS", target_columns=["n", "str"], transformations=[col("n") * 10, col("str")], format_type_options={"COMPRESSION": "GZIP", "RECORD_DELIMITER": "|"}, statement_params={"foo": "bar"}, force=True, INCLUDE_METADATA={"filename_col": "METADATA$FILENAME", "row_num_col": "METADATA$FILE_ROW_NUMBER"}, iceberg_config={"external_volume": "example_volume", "partition_by": [bucket(10, "n"), truncate(2, col("str"))], "target_file_size": "128MB", "catalog": "my_catalog"})

df = session.table("table1")

Expand Down Expand Up @@ -254,7 +255,7 @@ body {
bool_val {
src {
end_column: 9
end_line: 52
end_line: 53
file: 2
start_column: 8
start_line: 41
Expand All @@ -263,6 +264,72 @@ body {
}
}
}
copy_options {
_1: "INCLUDE_METADATA"
_2 {
seq_map_val {
kvs {
vs {
string_val {
src {
end_column: 9
end_line: 53
file: 2
start_column: 8
start_line: 41
}
v: "filename_col"
}
}
vs {
string_val {
src {
end_column: 9
end_line: 53
file: 2
start_column: 8
start_line: 41
}
v: "METADATA$FILENAME"
}
}
}
kvs {
vs {
string_val {
src {
end_column: 9
end_line: 53
file: 2
start_column: 8
start_line: 41
}
v: "row_num_col"
}
}
vs {
string_val {
src {
end_column: 9
end_line: 53
file: 2
start_column: 8
start_line: 41
}
v: "METADATA$FILE_ROW_NUMBER"
}
}
}
src {
end_column: 9
end_line: 53
file: 2
start_column: 8
start_line: 41
}
}
}
}
df {
dataframe_ref {
id: 1
Expand All @@ -276,7 +343,7 @@ body {
string_val {
src {
end_column: 9
end_line: 52
end_line: 53
file: 2
start_column: 8
start_line: 41
Expand All @@ -291,7 +358,7 @@ body {
string_val {
src {
end_column: 9
end_line: 52
end_line: 53
file: 2
start_column: 8
start_line: 41
Expand All @@ -306,7 +373,7 @@ body {
string_val {
src {
end_column: 9
end_line: 52
end_line: 53
file: 2
start_column: 8
start_line: 41
Expand All @@ -321,7 +388,7 @@ body {
list_val {
src {
end_column: 9
end_line: 52
end_line: 53
file: 2
start_column: 8
start_line: 41
Expand Down Expand Up @@ -449,7 +516,7 @@ body {
string_val {
src {
end_column: 9
end_line: 52
end_line: 53
file: 2
start_column: 8
start_line: 41
Expand All @@ -464,7 +531,7 @@ body {
string_val {
src {
end_column: 9
end_line: 52
end_line: 53
file: 2
start_column: 8
start_line: 41
Expand All @@ -478,7 +545,7 @@ body {
}
src {
end_column: 9
end_line: 52
end_line: 53
file: 2
start_column: 8
start_line: 41
Expand Down Expand Up @@ -653,10 +720,10 @@ body {
}
src {
end_column: 31
end_line: 54
end_line: 55
file: 2
start_column: 14
start_line: 54
start_line: 55
}
}
}
Expand Down Expand Up @@ -685,10 +752,10 @@ body {
}
src {
end_column: 62
end_line: 56
end_line: 57
file: 2
start_column: 14
start_line: 56
start_line: 57
}
statement_params {
_1: "foo"
Expand Down Expand Up @@ -729,10 +796,10 @@ body {
}
src {
end_column: 128
end_line: 58
end_line: 59
file: 2
start_column: 8
start_line: 58
start_line: 59
}
warehouse: "test_wh"
}
Expand Down
105 changes: 95 additions & 10 deletions tests/integ/scala/test_dataframe_copy_into.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,23 +528,24 @@ def test_copy_json_write_with_column_names(session, tmp_stage_name1):
Utils.drop_table(session, table_name)


special_format_schema = StructType(
[
StructField("ID", IntegerType()),
StructField("USERNAME", StringType()),
StructField("FIRSTNAME", StringType()),
StructField("LASTNAME", StringType()),
]
)


def test_csv_read_format_name(session, tmp_stage_name1):
temp_file_fmt_name = Utils.random_name_for_temp_object(TempObjectType.FILE_FORMAT)
session.sql(
f"create temporary file format {temp_file_fmt_name} type = csv skip_header=1 "
"null_if = ('none','NA');"
).collect()
df = (
session.read.schema(
StructType(
[
StructField("ID", IntegerType()),
StructField("USERNAME", StringType()),
StructField("FIRSTNAME", StringType()),
StructField("LASTNAME", StringType()),
]
)
)
session.read.schema(special_format_schema)
.option("format_name", temp_file_fmt_name)
.csv(
f"@{tmp_stage_name1}/{test_file_csv_special_format}",
Expand Down Expand Up @@ -1442,3 +1443,87 @@ def create_and_append_check_answer(table_name_input):
# drop schema
Utils.drop_schema(session, schema)
Utils.drop_schema(session, double_quoted_schema)


def test_copy_into_table_include_metadata_requires_match_by_column_name(
session, tmp_stage_name1
):
test_file_on_stage = f"@{tmp_stage_name1}/{test_file_csv}"
table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE)
df = session.read.schema(user_schema).csv(test_file_on_stage)
with pytest.raises(
ValueError,
match="INCLUDE_METADATA can only be used with the MATCH_BY_COLUMN_NAME copy option.",
):
df.copy_into_table(
table_name,
INCLUDE_METADATA={"filename_col": "METADATA$FILENAME"},
)


def test_copy_into_table_include_metadata_with_error_on_column_count_mismatch(
session, tmp_stage_name1
):
test_file_on_stage = f"@{tmp_stage_name1}/{test_file_csv}"
table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE)
df = session.read.schema(user_schema).csv(test_file_on_stage)
with pytest.raises(
ValueError,
match="ERROR_ON_COLUMN_COUNT_MISMATCH must be False when INCLUDE_METADATA is used with CSV files",
):
df.copy_into_table(
table_name,
format_type_options={"ERROR_ON_COLUMN_COUNT_MISMATCH": True},
INCLUDE_METADATA={"filename_col": "METADATA$FILENAME"},
MATCH_BY_COLUMN_NAME="CASE_INSENSITIVE",
)


def test_copy_into_table_include_metadata_requires_supported_metadata_column(
session, tmp_stage_name1
):
test_file_on_stage = f"@{tmp_stage_name1}/{test_file_csv}"
table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE)
df = session.read.schema(user_schema).csv(test_file_on_stage)
with pytest.raises(
ValueError,
match="Metadata column NON_EXISTING_COLUMN is not supported",
):
df.copy_into_table(
table_name,
INCLUDE_METADATA={"filename_col": "NON_EXISTING_COLUMN"},
)


def test_copy_into_table_include_metadata_csv(session, tmp_stage_name1):
test_file_on_stage = f"@{tmp_stage_name1}/{test_file_csv_special_format}"
table_name = Utils.random_name_for_temp_object(TempObjectType.TABLE)
Utils.create_table(
session,
table_name,
"id String, username String, firstname String, lastname String, filename_col String, row_num_col Int",
)
try:
df = (
session.read.schema(special_format_schema)
.option("PARSE_HEADER", True)
.csv(test_file_on_stage)
)
df.copy_into_table(
table_name,
MATCH_BY_COLUMN_NAME="CASE_INSENSITIVE",
INCLUDE_METADATA={
"filename_col": '"metadata$filename"',
"ROW_NUM_COL": "METADATA$FILE_ROW_NUMBER",
},
force=True,
)
result = session.table(table_name).sort("ID").collect()
assert len(result) > 0
for i, row in enumerate(result):
assert all(v is not None for v in row)
assert len(row) == 6
assert row["FILENAME_COL"] == test_file_csv_special_format
assert row["ROW_NUM_COL"] == i + 1
finally:
Utils.drop_table(session, table_name)
Loading
Loading