Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48220][PYTHON] Allow passing PyArrow Table to createDataFrame() #46529

Open
wants to merge 29 commits into
base: master
Choose a base branch
from

Conversation

ianmcook
Copy link
Member

@ianmcook ianmcook commented May 10, 2024

What changes were proposed in this pull request?

  • Add support for passing a PyArrow Table to createDataFrame().
  • Document this on the Apache Arrow in PySpark user guide page.
  • Fix an issue with timestamp and struct columns in toArrow().

Why are the changes needed?

This seems like a logical next step after the addition of a toArrow() DataFrame method in #45481.

Does this PR introduce any user-facing change?

Users will have the ability to pass PyArrow Tables to createDataFrame(). There are no changes to the parameters of createDataFrame(). The only difference is that data can now be a PyArrow Table.

How was this patch tested?

Many tests were added, for Spark Classic and Spark Connect. I ran the tests locally with older versions of PyArrow installed (going back to 10.0).

Was this patch authored or co-authored using generative AI tooling?

No

@ianmcook ianmcook force-pushed the SPARK-48220 branch 8 times, most recently from c408d0b to 795d01a Compare May 13, 2024 22:14
@alippai
Copy link

alippai commented May 14, 2024

This makes the usage so much easier, thanks!

What will happen with the nanosecond timestamps? Truncated to milliseconds?

@ianmcook ianmcook force-pushed the SPARK-48220 branch 4 times, most recently from 005c4f7 to 7e34472 Compare May 15, 2024 16:15
@ianmcook
Copy link
Member Author

ianmcook commented May 15, 2024

What will happen with the nanosecond timestamps? Truncated to milliseconds?

Truncated to microseconds for now.

@alippai
Copy link

alippai commented May 15, 2024

I wish we had ns support in spark 4.0 as Java, parquet, arrow etc uses it natively now, but that’s certainly a different discussion and MRs :)

Thanks for the new API!

Copy link
Contributor

@zhengruifeng zhengruifeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is too big to follow and has many unrelated changes, can we break it to multiple PRs ?

python/pyspark/sql/connect/session.py Show resolved Hide resolved
@@ -46,6 +46,7 @@

if TYPE_CHECKING:
from py4j.java_gateway import JavaObject
import pyarrow as pa
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can not assume pyarrow is installed by default in Spark Classic

https://spark.apache.org/docs/latest/api/python/getting_started/install.html#dependencies

@HyukjinKwon

Copy link
Member Author

@ianmcook ianmcook May 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is OK when TYPE_CHECKING is True. There is another file in the repo that has had import pyarrow as pa inside an if TYPE_CHECKING: conditional since 2021: https://github.com/apache/spark/pull/34101/files#diff-a4f1631a18d1b4921b8727e1d78059a1014c433239eb107962c473c6466214e5R29

@@ -343,28 +344,29 @@ def createDataFrame(

@overload
def createDataFrame(
self, data: "PandasDataFrameLike", samplingRatio: Optional[float] = ...
self, data: Union["PandasDataFrameLike", "pa.Table"], samplingRatio: Optional[float] = ...
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we include pa.Table in PandasDataFrameLike?

Copy link
Member Author

@ianmcook ianmcook May 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. PandasDataFrameLike is just an alias for pd.DataFrame.

Details

In pyspark.sql.pandas._typing, DataFrameLike is defined like this:

from pandas.core.frame import DataFrame as PandasDataFrame
DataFrameLike = PandasDataFrame

Then in other parts of PySpark, PandasDataFrameLike is defined like this:

from pyspark.sql.pandas._typing import DataFrameLike as PandasDataFrameLike

If we define PandasDataFrameLike as a Union that includes pa.Table, that will cause other problems. For example, then we can't use it as the return type of toPandas().

python/pyspark/sql/pandas/types.py Outdated Show resolved Hide resolved
python/pyspark/sql/pandas/types.py Outdated Show resolved Hide resolved
@ianmcook
Copy link
Member Author

This PR is too big to follow and has many unrelated changes, can we break it to multiple PRs ?

Yes I am happy to break out the non-required changes into separate PRs.

@ianmcook
Copy link
Member Author

@zhengruifeng I broke out the non-required changes into separate PRs and I simplified the implementations. I think it should be easier to follow now. More than half of the added code is tests.

@ianmcook
Copy link
Member Author

ianmcook commented May 21, 2024

Here is a PDF of the Apache Arrow in PySpark user guide page rendered from this PR, with the new and modified sections highlighted: Apache Arrow in PySpark — PySpark master documentation.pdf

python/pyspark/sql/connect/session.py Outdated Show resolved Hide resolved
python/pyspark/sql/pandas/types.py Show resolved Hide resolved
Comment on lines 365 to 368
return pa.ListArray.from_arrays(
a.offsets,
_check_arrow_array_timestamps_localize(a.values, at.elementType, truncate, timezone),
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also this will not preserve nulls? (similarly like the issue you raised for map type, although ListArray already has a mask keyword. And we should also add something like apache/arrow#23380 to simply apply an existing validity bitmap buffer)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this. Fixed in 5db5e4b. Added tests in fc758e6 to fail if nulls are not preserved here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also confirmed that it's not necessary to pass mask when creating the dictionary array; the nulls in the dictionary indices pass through fine and are preserved.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how much more complex you want to make this part of the code, but one disadvantage with the current mask keyword is that it is not zero copy (since it inverts the validity bitmap twice), so you might want to avoid having to do this as much as possible:

  • you could check if the list's value_type is either timestamp or nested, and only in that case call _check_arrow_array_timestamps_localize on the value, and otherwise just return a (we have pa.types.is_nested that could help with that). That should already make any simple list of numeric type fully zero-copy
  • if you do have to recreated the ListArray, you can probably do mask=a.is_null() if a.null_count else None to avoid allocating a full bitmap in case there are no missing values

Copy link
Member Author

@ianmcook ianmcook May 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is worthwhile if it keeps zero-copy. Added in 7ed45e0. I did it for MapArrays and StructArrays too.

python/pyspark/sql/pandas/types.py Show resolved Hide resolved

# Convert the Spark DataFrame to a PyArrow Table
table = df.select("*").toArrow()
result_table = df.select("*").toArrow()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity why do we explicitly "select("*")"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed the pandas example (see below on line 69 of this same file). I was wondering this too, but I kept it just to match the pandas example. I'm happy to remove both if that would be better.

Copy link
Member Author

@ianmcook ianmcook May 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that the original purpose of the .select("*") was to represent some arbitrary transformations being lazily performed on the dataframe. That way readers will know that this works when there are transformations.

# If no schema supplied by user then get the names of columns only
if schema is None:
_cols = data.column_names
if isinstance(schema, (list, tuple)) and cast(int, _num_cols) < len(data.columns):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

            if isinstance(schema, (list, tuple)) and cast(int, _num_cols) < len(data.columns):
                assert isinstance(_cols, list)
                _cols.extend([f"_{i + 1}" for i in range(cast(int, _num_cols), len(data.columns))])
                _num_cols = len(_cols)

is uneasy to follow and duplicated, shall we extract and reuse, or add a comment to both to help understand? Feel free to do it in a separate PR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is borrowed directly from the pandas section above (line 516). I would be happy to open a subsequent PR to try to deduplicate the logic and make it clearer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
7 participants