[PySpark] fix: support createDataFrame with list of dicts in Spark API#387
Closed
mariotaddeucci wants to merge 6 commits intoduckdb:mainfrom
Closed
[PySpark] fix: support createDataFrame with list of dicts in Spark API#387mariotaddeucci wants to merge 6 commits intoduckdb:mainfrom
mariotaddeucci wants to merge 6 commits intoduckdb:mainfrom
Conversation
… for ContributionsAcceptedError message regex
Port schema inference from duckdb/duckdb#18051 to fix duckdb#183. When calling spark.createDataFrame([{"col": value}, ...]), the Spark API now infers the schema from dict keys, matching PySpark behavior. Changes: - Add _type_mappings, _array_type_mappings, _has_nulltype, _merge_type, _infer_type, and _infer_schema functions to types.py - Update session.py to handle dict rows in _combine_data_and_schema and add schema inference branch in createDataFrame for list[dict] - Add _inferSchemaFromList method to SparkSession - Fix test_struct_column to use inferred field names instead of col0/col1 - Add test_dataframe_from_list_dicts test case
Contributor
Author
|
Closing in favor of a cleaner branch with only the relevant commit. New PR coming. |
Contributor
There was a problem hiding this comment.
Pull request overview
This PR improves DuckDB’s experimental Spark API compatibility by adding schema inference for createDataFrame() when passed list[dict], and it also introduces initial WindowSpec/window-function support used by new tests.
Changes:
- Infer
StructTypeschema fromlist[dict]input (unioning keys across rows) and align dict row values by schema field order. - Add schema inference utilities in
sql/types.py(_infer_schema,_infer_type,_merge_type,_has_nulltype). - Introduce basic window specification & window functions (
WindowSpec,Window,Column.over, and functions likerow_number,lag, etc.) plus associated tests and namespace plumbing.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/spark_namespace/sql/window.py | Adds namespace shim to import Window from PySpark vs DuckDB Spark API. |
| tests/fast/spark/test_spark_functions_window.py | Adds coverage for window specs/functions behavior (orderBy/partitionBy/rowsBetween/rangeBetween/lag/lead/etc.). |
| tests/fast/spark/test_spark_dataframe.py | Adds test for createDataFrame(list_of_dicts) schema inference and missing keys. |
| tests/fast/spark/test_spark_column.py | Simplifies struct column test now that field names are inferred correctly. |
| external/duckdb | Bumps DuckDB submodule revision (dependency update). |
| duckdb/experimental/spark/sql/window.py | Implements WindowSpec and Window API. |
| duckdb/experimental/spark/sql/types.py | Adds type mapping + schema/type inference & merge helpers. |
| duckdb/experimental/spark/sql/session.py | Wires schema inference for list input and dict-row alignment in _combine_data_and_schema. |
| duckdb/experimental/spark/sql/functions.py | Adds window functions wrappers (row_number, rank, lag/lead, etc.). |
| duckdb/experimental/spark/sql/column.py | Adds Column.over(WindowSpec) to render ... OVER (...) SQL. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+1
to
+6
| from collections.abc import Sequence | ||
|
|
||
| from ..errors import PySparkTypeError | ||
| from ..exception import ContributionsAcceptedError | ||
| from ._typing import ColumnOrName | ||
| from .column import Column |
| new_window._range_between = self._range_between | ||
| return new_window | ||
|
|
||
| def partitionBy(self, *cols: ColumnOrName | Sequence[ColumnOrName]) -> "WindowSpec": |
Comment on lines
+44
to
+47
| all_cols: list[ColumnOrName] | list[list[ColumnOrName]] = list(cols) # type: ignore[assignment] | ||
|
|
||
| if isinstance(all_cols[0], list): | ||
| all_cols = all_cols[0] |
Comment on lines
+164
to
+165
| return f"{start} PRECEDING AND {end} FOLLOWING" | ||
|
|
| def test_moving_average_last_3_points(self, spark): | ||
| data = [(1, 10), (2, 20), (3, 30), (4, 40), (5, 50)] | ||
| df = spark.createDataFrame(data=data, schema=["idx", "value"]) | ||
| w = Window.orderBy("idx").rowsBetween(2, Window.currentRow) |
| # rows within a value distance of 2 up to the current row. | ||
| data = [(1, 10), (2, 20), (3, 30), (4, 40), (6, 60)] | ||
| df = spark.createDataFrame(data=data, schema=["idx", "value"]) | ||
| w = Window.orderBy("idx").rangeBetween(2, Window.currentRow) |
Comment on lines
154
to
157
| if isinstance(schema, StructType): | ||
| types, names = schema.extract_types_and_names() | ||
| else: | ||
| names = schema |
Comment on lines
40
to
48
| new_data = [] | ||
| for row in data: | ||
| new_row = [Value(x, dtype.duckdb_type) for x, dtype in zip(row, [y.dataType for y in schema], strict=False)] | ||
| if isinstance(row, dict): | ||
| row_values = list(map(row.get, schema.fieldNames())) | ||
| else: | ||
| row_values = list(row) | ||
| new_row = [Value(x, dtype.duckdb_type) for x, dtype in zip(row_values, [y.dataType for y in schema], strict=False)] | ||
| new_data.append(new_row) | ||
| return new_data |
| import duckdb | ||
| from duckdb.sqltypes import DuckDBPyType | ||
|
|
||
| from ..errors.exceptions.base import PySparkTypeError |
|
|
||
| Parameters | ||
| ---------- | ||
| window : :class:`WindowSpec` |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
spark.createDataFrame([{"col": value}, ...]), the Spark API now infers the schema from dict keys, matching PySpark behaviorChanges
duckdb/experimental/spark/sql/types.py_type_mappingsand_array_type_mappingsdicts mapping Python types to Spark SQL DataTypes_has_nulltype()to check for NullType in a schema tree_merge_type()to merge two DataTypes (used when inferring schema across multiple rows)_infer_type()to infer a DataType from a Python object_infer_schema()to infer a StructType schema from a dict/namedtuple/Row/objectduckdb/experimental/spark/sql/session.py_combine_data_and_schema()to handle dict rows by extracting values in schema field ordercreateDataFrame()forlist[dict]input without explicit schema_inferSchemaFromList()method toSparkSessiontests/fast/spark/test_spark_column.pytest_struct_column— removedUSE_ACTUAL_SPARKbranching since Row field names are now correctly inferredtests/fast/spark/test_spark_dataframe.pytest_dataframe_from_list_dictstest covering dicts with different key orders and missing keys