Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QUESTION] How to use a CoTransformer on data frames with shared non-key columns #358

Open
jstammers opened this issue Sep 12, 2022 · 4 comments

Comments

@jstammers
Copy link

I have a function that aims to implement an SCD2 merge on two dataframes.

In my example, I am attempting to merge two dataframes together, using a single column as the key. The transformation should modify rows with a matching key, and insert all rows from the second dataframe.

When I execute this code, the zip method by default performs an inner join using the columns which are duplicated across the dataframes. This has the effect of dropping rows with missing values in shared columns, which means that the input dataframes to the scd2_merge are not what is expected.

from typing import Iterable, Dict, Any, List
import pandas as pd
from fugue import DataFrames, FugueWorkflow

df1 = pd.DataFrame({"a": [1, 2, 3], "b": [1, None, 3], "c": [1, 1, 1]})
df2 = pd.DataFrame({"a": [2, 3, 4], "b": [2, 3, None]})


# schema: a:int,b:float,c:int
def scd2_merge(dfs: DataFrames) -> List[List[Any]]:
    """performs an SCD2-type merge.
     Any rows in df1 that have a matching key value in df2 will have
     their current flag 'c'
     set to 0, before the rows in df2 are inserted"""
    ix = "a"
    df1 = dfs[0].as_pandas()
    df2 = dfs[1].as_pandas()
    df1.loc[df1[ix].isin(df2[ix]), "c"] = 0
    df2["c"] = 1
    return pd.concat([df1, df2])


with FugueWorkflow(engine='pandas') as dag:
    df1 = dag.df(df1, "a:int,b:float,c:int")
    df2 = dag.df(df2, "a:int,b:float")
    dag.zip(df1, df2).transform(scd2_merge).show()

Is there a correct way to implement this type of transformation?

@goodwanghan
Copy link
Collaborator

You can do

df1.partition_by("a").zip(df2).transform(scd2_merge).show()

or

dag.zip(df1, df2, partition={"by":"a"}).transform(scd2_merge).show()

@goodwanghan
Copy link
Collaborator

I also modified your code a little bit to follow good practices:

import pandas as pd
from fugue import FugueWorkflow

_df1 = pd.DataFrame({"a": [1, 2, 3], "b": [1, None, 3], "c": [1, 1, 1]})
_df2 = pd.DataFrame({"a": [2, 3, 4], "b": [2, 3, None]})


# schema: a:int,b:float,c:int
def scd2_merge(df1:pd.DataFrame, df2:pd.DataFrame) -> pd.DataFrame:
    """performs an SCD2-type merge.
     Any rows in df1 that have a matching key value in df2 will have
     their current flag 'c'
     set to 0, before the rows in df2 are inserted"""
    ix = "a"
    df1 = df1.assign(c=~df1[ix].isin(df2[ix]))
    df2 = df2.assign(c=1)
    return pd.concat([df1, df2])


dag = FugueWorkflow()
df1 = dag.df(_df1)
df2 = dag.df(_df2)
df1.partition_by("a").zip(df2).transform(scd2_merge).show()

dag.run()
  1. Reduced the dependency on Fugue, the input of the scd_merge can all be native types
  2. Removed the with statement, remove the predefined engine on with statement. DAG definition doesn't require context manager, dag,run() should separate
  3. Removed unnecessary schemas when you do dag.df
  4. Use assign to avoid mutating the dataframe (pandas good practice)
  5. Renamed df* to _df* (python good practice)

@jstammers
Copy link
Author

HI @goodwanghan, thanks for the suggestions - I've been able to modify my code as you've suggested to get it to work using the partition_by.zip syntax. For my particular use-case, I needed to use

.zip(df2, how="full_outer")

to ensure that I had non-intersecting keys in the output.

Also, I am intending to run this in production using the SparkExecutionEngine. Is there anything about this workflow I should be aware of that could affect the performance?

@goodwanghan
Copy link
Collaborator

goodwanghan commented Sep 13, 2022

Wonderful! When you test on spark, don't use SparkExecutionEngine, you should just use the spark session:

dag.run(spark_session)

spark.sql.shuffle.partitions should be set properly (this applies to general spark execution)

You can enable fugue.spark.use_pandas_udf to see if it can be faster, I think for your case it may not have an effect.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants