From 8144333eb17cc7262ce99131376be0ca850bbce0 Mon Sep 17 00:00:00 2001 From: Adam Ling Date: Mon, 9 Feb 2026 17:25:53 -0800 Subject: [PATCH] inter operate poc --- src/snowflake/snowpark/dataframe.py | 34 +++++++++++++++ src/snowflake/snowpark/session.py | 64 +++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+) diff --git a/src/snowflake/snowpark/dataframe.py b/src/snowflake/snowpark/dataframe.py index 8d557ef992..90623edb22 100644 --- a/src/snowflake/snowpark/dataframe.py +++ b/src/snowflake/snowpark/dataframe.py @@ -6914,6 +6914,40 @@ def print_schema(self, level: Optional[int] = None) -> None: """ print(self._format_schema(level)) # noqa: T201: we need to print here. + def to_snowpark_connect_dataframe( + self, + spark_session: "pyspark.sql.SparkSession", + view_name: Optional[str] = None, + ) -> "pyspark.sql.DataFrame": + """Convert this Snowpark DataFrame to a PySpark DataFrame via Snowpark Connect + (lazy, no data materialization). + + Creates a Snowflake temporary view from this DataFrame's logical plan, then has + PySpark read from that view through Snowpark Connect. The query plan is preserved + end-to-end — no data is materialized until an action is triggered on the returned + PySpark DataFrame. + + Args: + spark_session: A PySpark SparkSession connected via Snowpark Connect. + view_name: Optional custom name for the intermediate Snowflake temporary view. + If not provided, a unique name is auto-generated. + + Returns: + A PySpark DataFrame backed by the same Snowflake query plan. + + Example:: + + >>> snowpark_df = session.create_dataframe([[1, "Alice"], [2, "Bob"]], schema=["id", "name"]) + >>> spark_df = snowpark_df.to_snowpark_connect_dataframe(spark) # doctest: +SKIP + >>> spark_df.filter(spark_df.id > 1).show() # doctest: +SKIP + """ + import uuid + + if view_name is None: + view_name = f"__snowpark_to_spark_{uuid.uuid4().hex[:8]}" + self.create_or_replace_temp_view(view_name) + return spark_session.sql(f"SELECT * FROM {view_name}") + where = filter # Add the following lines so API docs have them diff --git a/src/snowflake/snowpark/session.py b/src/snowflake/snowpark/session.py index 99293569ff..71e016286d 100644 --- a/src/snowflake/snowpark/session.py +++ b/src/snowflake/snowpark/session.py @@ -4028,6 +4028,70 @@ def convert_row_to_list( return df + def from_snowpark_connect_dataframe( + self, + spark_df: "pyspark.sql.DataFrame", + view_name: Optional[str] = None, + ) -> DataFrame: + """Create a Snowpark DataFrame from a PySpark DataFrame via Snowpark Connect + (lazy, no data materialization). + + Registers the PySpark DataFrame as a Spark-local temporary view, then uses + ``spark.sql()`` to create a real Snowflake TEMPORARY VIEW from it (Snowpark + Connect resolves the local view's logical plan into the DDL sent to Snowflake), + and finally reads the view back as a Snowpark DataFrame. + + The query plan is preserved end-to-end — no data is materialized until an + action is triggered on the returned Snowpark DataFrame. + + Args: + spark_df: A PySpark DataFrame from a Snowpark Connect session. + view_name: Optional custom name for the intermediate Snowflake temporary view. + If not provided, a unique name is auto-generated. + + Returns: + A Snowpark :class:`DataFrame` backed by the same Snowflake query plan. + + Example:: + + >>> spark_df = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"]) # doctest: +SKIP + >>> snowpark_df = session.from_snowpark_connect_dataframe(spark_df) # doctest: +SKIP + >>> snowpark_df.show() # doctest: +SKIP + """ + import uuid + + if view_name is None: + view_name = f"__spark_to_snowpark_{uuid.uuid4().hex[:8]}" + + local_view = f"_local_{view_name}" + spark_session = spark_df.sparkSession + + # Step 1: Register PySpark DF as a Spark-local temp view + spark_df.createOrReplaceTempView(local_view) + + # Step 2: Create a real Snowflake VIEW via spark_session.sql(). + # + # We MUST use spark_session.sql() (not self.sql()) because only the Spark + # Connect server can resolve the local temp view name from Step 1. + # + # We use CREATE VIEW (permanent), NOT CREATE TEMPORARY VIEW, because + # Snowpark Connect by default handles "CREATE TEMPORARY VIEW" locally in + # Spark's in-memory catalog (CreateViewCommand → store_temporary_view_as_dataframe). + # Only "CREATE VIEW" (CreateView logical plan) actually sends DDL to Snowflake + # via Snowpark Python's create_or_replace_view(). + # + # Note: This creates a permanent Snowflake view. Snowflake views are just + # stored query definitions (no data is materialized). The caller can drop + # the view after use with: session.sql("DROP VIEW IF EXISTS ").collect() + # TODO: can we create a temporary view in Snowflake using snowpark connect session? + # if we create a permanent view, it will be leaked, can we modify snowpark-connect view implementation to create a temporary view to support create real temporary view? + spark_session.sql( + f"CREATE OR REPLACE VIEW {view_name} AS SELECT * FROM {local_view}" + ).collect() + + # Step 3: Return a Snowpark DataFrame backed by the Snowflake view. + return self.table(view_name) + @publicapi def range( self,