New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48220][PYTHON] Allow passing PyArrow Table to createDataFrame() #46529
base: master
Are you sure you want to change the base?
Conversation
c408d0b
to
795d01a
Compare
This makes the usage so much easier, thanks! What will happen with the nanosecond timestamps? Truncated to milliseconds? |
005c4f7
to
7e34472
Compare
Truncated to microseconds for now. |
I wish we had ns support in spark 4.0 as Java, parquet, arrow etc uses it natively now, but that’s certainly a different discussion and MRs :) Thanks for the new API! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR is too big to follow and has many unrelated changes, can we break it to multiple PRs ?
@@ -46,6 +46,7 @@ | |||
|
|||
if TYPE_CHECKING: | |||
from py4j.java_gateway import JavaObject | |||
import pyarrow as pa |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can not assume pyarrow
is installed by default in Spark Classic
https://spark.apache.org/docs/latest/api/python/getting_started/install.html#dependencies
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is OK when TYPE_CHECKING
is True. There is another file in the repo that has had import pyarrow as pa
inside an if TYPE_CHECKING:
conditional since 2021: https://github.com/apache/spark/pull/34101/files#diff-a4f1631a18d1b4921b8727e1d78059a1014c433239eb107962c473c6466214e5R29
@@ -343,28 +344,29 @@ def createDataFrame( | |||
|
|||
@overload | |||
def createDataFrame( | |||
self, data: "PandasDataFrameLike", samplingRatio: Optional[float] = ... | |||
self, data: Union["PandasDataFrameLike", "pa.Table"], samplingRatio: Optional[float] = ... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we include pa.Table
in PandasDataFrameLike
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. PandasDataFrameLike
is just an alias for pd.DataFrame
.
Details
In pyspark.sql.pandas._typing
, DataFrameLike
is defined like this:
from pandas.core.frame import DataFrame as PandasDataFrame
DataFrameLike = PandasDataFrame
Then in other parts of PySpark, PandasDataFrameLike
is defined like this:
from pyspark.sql.pandas._typing import DataFrameLike as PandasDataFrameLike
If we define PandasDataFrameLike
as a Union that includes pa.Table
, that will cause other problems. For example, then we can't use it as the return type of toPandas()
.
Yes I am happy to break out the non-required changes into separate PRs. |
@zhengruifeng I broke out the non-required changes into separate PRs and I simplified the implementations. I think it should be easier to follow now. More than half of the added code is tests. |
Here is a PDF of the Apache Arrow in PySpark user guide page rendered from this PR, with the new and modified sections highlighted: Apache Arrow in PySpark — PySpark master documentation.pdf |
python/pyspark/sql/pandas/types.py
Outdated
return pa.ListArray.from_arrays( | ||
a.offsets, | ||
_check_arrow_array_timestamps_localize(a.values, at.elementType, truncate, timezone), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also this will not preserve nulls? (similarly like the issue you raised for map type, although ListArray already has a mask
keyword. And we should also add something like apache/arrow#23380 to simply apply an existing validity bitmap buffer)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also confirmed that it's not necessary to pass mask
when creating the dictionary array; the nulls in the dictionary indices pass through fine and are preserved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know how much more complex you want to make this part of the code, but one disadvantage with the current mask
keyword is that it is not zero copy (since it inverts the validity bitmap twice), so you might want to avoid having to do this as much as possible:
- you could check if the list's
value_type
is either timestamp or nested, and only in that case call_check_arrow_array_timestamps_localize
on the value, and otherwise just returna
(we havepa.types.is_nested
that could help with that). That should already make any simple list of numeric type fully zero-copy - if you do have to recreated the ListArray, you can probably do
mask=a.is_null() if a.null_count else None
to avoid allocating a full bitmap in case there are no missing values
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is worthwhile if it keeps zero-copy. Added in 7ed45e0. I did it for MapArrays and StructArrays too.
|
||
# Convert the Spark DataFrame to a PyArrow Table | ||
table = df.select("*").toArrow() | ||
result_table = df.select("*").toArrow() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity why do we explicitly "select("*")"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I followed the pandas example (see below on line 69 of this same file). I was wondering this too, but I kept it just to match the pandas example. I'm happy to remove both if that would be better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect that the original purpose of the .select("*")
was to represent some arbitrary transformations being lazily performed on the dataframe. That way readers will know that this works when there are transformations.
# If no schema supplied by user then get the names of columns only | ||
if schema is None: | ||
_cols = data.column_names | ||
if isinstance(schema, (list, tuple)) and cast(int, _num_cols) < len(data.columns): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
if isinstance(schema, (list, tuple)) and cast(int, _num_cols) < len(data.columns):
assert isinstance(_cols, list)
_cols.extend([f"_{i + 1}" for i in range(cast(int, _num_cols), len(data.columns))])
_num_cols = len(_cols)
is uneasy to follow and duplicated, shall we extract and reuse, or add a comment to both to help understand? Feel free to do it in a separate PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is borrowed directly from the pandas section above (line 516). I would be happy to open a subsequent PR to try to deduplicate the logic and make it clearer.
What changes were proposed in this pull request?
createDataFrame()
.toArrow()
.Why are the changes needed?
This seems like a logical next step after the addition of a
toArrow()
DataFrame method in #45481.Does this PR introduce any user-facing change?
Users will have the ability to pass PyArrow Tables to
createDataFrame()
. There are no changes to the parameters ofcreateDataFrame()
. The only difference is thatdata
can now be a PyArrow Table.How was this patch tested?
Many tests were added, for Spark Classic and Spark Connect. I ran the tests locally with older versions of PyArrow installed (going back to 10.0).
Was this patch authored or co-authored using generative AI tooling?
No