Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use @inject with @pipe decorators #748

Open
nhuray opened this issue Mar 8, 2024 · 1 comment
Open

Use @inject with @pipe decorators #748

nhuray opened this issue Mar 8, 2024 · 1 comment
Labels
triage label for issues that need to be triaged.

Comments

@nhuray
Copy link
Contributor

nhuray commented Mar 8, 2024

Talking with @skrawcz and @elijahbenizzy on Slack about a use case @skrawcz discovered experimenting that using @inject decorator in conjunction with @pipe decorator does not work.

Current behavior

@skrawcz Please add more description here about the current behavior

Expected behavior

The decorators should work in conjunction.

@nhuray nhuray added the triage label for issues that need to be triaged. label Mar 8, 2024
@skrawcz
Copy link
Collaborator

skrawcz commented Mar 8, 2024

Stack trace:

Traceback (most recent call last):
ValueError: Cannot call NodeExpander: <class 'hamilton.function_modifiers.expanders.inject'> on more than one node. This must be called first in the DAG. Called with [<A_processed {'module': 'functions'}>, <A_processed.with_echo {'module': 'functions'}>, <A_processed.with_echo_1 {'module': 'functions'}>] 

Code:

# functions.py - declare and link your transformations as functions....
import pandas as pd

from hamilton.function_modifiers import extract_fields, inject, source, pipe, step

@extract_fields(
    {
        "col1 == 'A' and col2 =='B'": pd.DataFrame,
        "col1 == 'B' and col2 =='C'": pd.DataFrame,
    }
)
def dataframe_partition(partitions: list[str], df: pd.DataFrame) -> dict:
    # get a dict by grouping
    # return dict of name to dataframe
    return {"col1 == 'A' and col2 =='B'": pd.DataFrame({'a': [1,2,3]}),
            "col1 == 'B' and col2 =='C'": pd.DataFrame({'b': [1,2,3,]})}
            

def _echo(value: pd.DataFrame, v: int) -> pd.DataFrame:
    print(v)
    return value


@pipe(
    step(_echo, v=1),
    step(_echo, v=2),
)
@inject(A=source("col1 == 'A' and col2 =='B'"))
# A should be applied on "A White Horse" Dataframe
def A_processed(A: pd.DataFrame) -> pd.DataFrame:
    print("I've just done a ton of transformations, each one of which is a node in the DAG")
    return A # it gets passed the result of transforming them
#run.py
# And run them!
import functions
from hamilton import driver
dr = driver.Driver({}, functions)
dr.display_all_functions(
   "graph.dot", orient="TB", show_legend=False)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
triage label for issues that need to be triaged.
Projects
None yet
Development

No branches or pull requests

2 participants