Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QUESTION] Pandas mutability causes different results compared to Spark and DuckDB #356

Open
lukeb88 opened this issue Sep 7, 2022 · 9 comments
Labels

Comments

@lukeb88
Copy link

lukeb88 commented Sep 7, 2022

The following code:

import pandas as pd
df1 = pd.DataFrame({'col1': [1, 2, 3], 'col2': [ 2,  3,  4]})
df2 = pd.DataFrame({'col1': [1, 2, 3], 'col4': [11, 12, 13]})

# schema: *, col3:int
def make_new_col(df: pd.DataFrame) -> pd.DataFrame:
    ''''''
    df['col3'] = df['col1'] + df['col2']
    return df 

from fugue_sql import fsql
res = fsql(
    '''
    transformed = TRANSFORM df1 USING make_new_col
    YIELD DATAFRAME AS partial_result

    SELECT transformed.*, df2.col4
    FROM transformed
    INNER JOIN df2 ON transformed.col1 = df2.col1
    YIELD DATAFRAME AS result
    '''
).run('duckdb')

works in the same way using DuckDB, pands or Spark as engine. Returning:

res['partial_result'].as_pandas()
col1 col2 col3
1 2 3
2 3 5
3 4 7
res['result'].as_pandas()
col1 col2 col3 col4
1 2 3 11
2 3 5 12
3 4 7 13

But, if I change the first row of the sql, from:
transformed = TRANSFORM df1 USING make_new_col
To:
transformed = SELECT * FROM df1 WHERE 1=1 TRANSFORM USING make_new_col

I obtain 2 different solution, one for Pandas and another one for DuckDB and Spark: with Pandas the results remains the same as above, while for the other engines, res['partial_result'] still the same, but res['result'] it's different:

col1 col2 col4
1 2 11
2 3 12
3 4 13

It seems that in the JOIN operation the transformed was missing of the col3 generated by the make_new_col function.

Adding a PRINT transformed after the first yield (YIELD DATAFRAME AS partial_result), i see that, for both pandas and Spark|DuckSB, transformed does not contain the new col3.

I don't understand 2 things at this point:

  1. why is that transformed does not contains col3, what is wrong with transformed = SELECT * FROM df1 WHERE 1=1 TRANSFORM USING make_new_col
  2. if (for Pandas) transformed does not contains col3, how it's possible that in after the JOIN i obtain a result with also col3
@kvnkho
Copy link
Collaborator

kvnkho commented Sep 7, 2022

Hey @lukeb88,

Thanks for the well-written issue. You've obviously spent a lot of time learning FugueSQL. I can answer the first question immediately while I look into the second. First, we need to break this expression down. The TLDR here is that col3 is definitely in partial_result. We just need clarification on what is being shown by PRINT. partial_result is indeed the same across the engines.

Look at this.

transformed = TRANSFORM df1 USING make_new_col

TRANSFORM is a statement. SELECT is also a new statement. This is why you can use TRANSFORM without SELECT.

So when you changed it to:

transformed = SELECT * FROM df1 WHERE 1=1 TRANSFORM USING make_new_col

There are actually 2 statements here, the SELECT and the TRANSFORM. transformed is taking the value of the SELECT statement, not after the TRANSFORM. TRANSFORM is just taking advantage of Anonymity to TRANSFORM the output of the SELECT statement to simplify the syntax.

If we write it out fully with the PRINT you added:

query = '''
    transformed = SELECT * FROM df1 WHERE 1=1 TRANSFORM USING make_new_col
    YIELD DATAFRAME AS partial_result
    PRINT transformed
    ...
    '''

transformed is just the output of the SELECT. Compare that to this:

query = '''
    transformed = SELECT * FROM df1 WHERE 1=1 
    transformed2 = TRANSFORM transformed USING make_new_col
    YIELD DATAFRAME AS partial_result
    PRINT transformed2
'''

And you will find col3 is indeed there and it is consistent across all DuckDB, Spark, and Pandas.

@kvnkho
Copy link
Collaborator

kvnkho commented Sep 7, 2022

For the second question, there is something we should fix. It is a bug for the behavior to be inconsistent across execution engines, but I can explain why this happens and give you a way to work around it for now.

This has to do with Pandas being mutable and Spark/DuckDB being immutable. For example, the code below has a function that does not return anything after performing an operation.

import pandas as pd
df1 = pd.DataFrame({'col1': [1, 2, 3], 'col2': [ 2,  3,  4]})

def make_new_col(df):
    df['col3'] = df['col1'] + df['col2']
    
make_new_col(df1)
df1.head()

Still, the output of df1 will contain the new column because it was mutated.

col1	col2	col3
1	2	3
2	3	5
3	4	7

But doing the same thing on PySpark results in col3 not being present:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

df1 = pd.DataFrame({'col1': [1, 2, 3], 'col2': [ 2,  3,  4]})
spark = SparkSession.builder.getOrCreate()
sdf = spark.createDataFrame(df1)

def make_new_col(sdf):
    sdf = sdf.withColumn('col3', col('col1') + col('col2'))
    
make_new_col(sdf)
sdf.show()

results in:

+----+----+
|col1|col2|
+----+----+
|  1|     2|
|  2|     3|
|  3|     4|
+----+----+

So let's recall in FugueSQL that you had the following statement:

    transformed = SELECT * FROM df1 WHERE 1=1 TRANSFORM USING make_new_col
    YIELD DATAFRAME AS partial_result

And remember that SELECT and TRANSFORM are two different statements here. transformed only captures the output of the SELECT statement and the TRANSFORM is performed afterward. For the Pandas backend, these are the same DataFrame while for Spark/DuckDB, they are different. For Pandas, that TRANSFORM call was able to edit the transformed DataFrame.

Just to be clear, the YIELD DATAFRAME happens on the output of the TRANSFORM because of the Anonymity.

So when you do:

    SELECT transformed.*, df2.col4
    FROM transformed

on the Pandas backend, it will have col3 but on the Spark and DuckDB backend, it won't. The quick fix for now is to name the output of TRANSFORM and use that in the join.

query = '''
    transformed = SELECT * FROM df1 WHERE 1=1 
    transformed2 = TRANSFORM USING make_new_col
    YIELD DATAFRAME AS partial_result

    SELECT transformed2.*, df2.col4
    FROM transformed2
    INNER JOIN df2 ON transformed2.col1 = df2.col1
    YIELD DATAFRAME AS result
    '''

I tested this on all three backends and got consistent results. Just let me know if you still see something else. You may get faster help on Slack too if you're willing to join.

Appreciate you digging in to FugueSQL!

@kvnkho
Copy link
Collaborator

kvnkho commented Sep 7, 2022

One last thing I forgot to mention is that the function

# schema: *, col3:int
def make_new_col(df: pd.DataFrame) -> pd.DataFrame:
    ''''''
    df['col3'] = df['col1'] + df['col2']
    return df 

is the reason for the direct mutation of the Pandas DataFrame. You can change it to:

# schema: *, col3:int
def make_new_col(df: pd.DataFrame) -> pd.DataFrame:
    ''''''
    df = df.assign(col3 = df['col1'] + df['col2'])
    return df 

which will create a copy of the DataFrame. That way, you will get col3 missing for all backends (Pandas, Spark, and DuckDB). It will be consistent.

I know this function was lifted off the tutorials. That's because I am trying to write code more familiar to Pandas users even if it may not be best practice. I guess it's coming back to bite me. 😬

@lukeb88
Copy link
Author

lukeb88 commented Sep 8, 2022

Hi @kvnkho.

thank you for your clear and detailed answers! Everything is much clearer to me now.

My two cents for the problem raised from the mutability of Pandas and the immutability of Spark/DuckDB: I had the chance to see only a little bit of Fugue's code, and obviously I don't know how it works under the hood, however, couldn't it be an idea (with Pandas engine) that a deep copy of the dataframe is automatically passed to the python functions, in a completely transparent way for the user? while obviously there would be no need in the case of Spark/DuckDB...

@lukeb88
Copy link
Author

lukeb88 commented Sep 8, 2022

P.S. now that I had understood better how it works (just to complete your answer in case anyone else runs into this), another possibility to keep the consistency over all three engines is:

transformed = TRANSFORM (SELECT * FROM df1 WHERE 1=1) USING make_new_col
YIELD DATAFRAME AS partial_result

PRINT transformed

SELECT transformed.*, df2.col4
FROM transformed
INNER JOIN df2 ON transformed.col1 = df2.col1
YIELD DATAFRAME AS result

@goodwanghan
Copy link
Collaborator

goodwanghan commented Sep 8, 2022

My two cents for the problem raised from the mutability of Pandas and the immutability of Spark/DuckDB: I had the chance to see only a little bit of Fugue's code, and obviously I don't know how it works under the hood, however, couldn't it be an idea (with Pandas engine) that a deep copy of the dataframe is automatically passed to the python functions, in a completely transparent way for the user? while obviously there would be no need in the case of Spark/DuckDB...

@lukeb88 that is a very good question, and I have to say it is a painful decision. Deepcopy can be very slow, but without a deepcopy it's not possible to prevent users from mutating the input dataframe (if not following good practice). Here our decision is in favor of the performance and we expect users to follow good practices. Given this context, what is your opinion about the choices?

@lukeb88
Copy link
Author

lukeb88 commented Sep 8, 2022

@goodwanghan I understand that it is a difficult choice.

(Assuming that deepcopy can be done conditionally, only when Pandas has been chosen as the engine)

I would probably go with deepcopy, keeping in mind that: it becomes a problem with large datasets, but at that point, it would probably not make sense to use pandas as the engine anyway.

I understand that you place emphasis on performance, but on the other hand, Fugue is an interface and perhaps it's even more important the fact that it is 100% consistent across all engines...

@kvnkho
Copy link
Collaborator

kvnkho commented Sep 8, 2022

@lukeb88 , that is very well articulated. They shouldn't be using Pandas for larger datasets ideally. Thanks for that!

And your example of using SELECT inline is very elegant!

@kvnkho kvnkho changed the title [QUESTION] fsql with Pandas or DuckDB engine seems to have different behaviour [QUESTION] Pandas mutability causes different results compared to Spark and DuckDB Sep 8, 2022
@goodwanghan
Copy link
Collaborator

@lukeb88 I think it is very well said, and it also aligned with Fugue's priority: consistency is more important than performance. I will create a PR to make the change, or if you are interested you can create the first PR for Fugue :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants