Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] Support operator_arguments injection at a node level #881

Open
linchun3 opened this issue Mar 7, 2024 · 5 comments
Open

[feature] Support operator_arguments injection at a node level #881

linchun3 opened this issue Mar 7, 2024 · 5 comments
Labels
area:config Related to configuration, like YAML files, environment variables, or executer configuration parsing:custom Related to custom parsing, like custom DAG parsing, custom DBT parsing, etc triage-needed Items need to be reviewed / assigned to milestone
Milestone

Comments

@linchun3
Copy link
Contributor

linchun3 commented Mar 7, 2024

Context

Currently, we can use operator_args to pass in task-level arguments to all tasks in a DbtTaskGroup or DbtDAG.

However, this applies across all tasks in the DAG/TaskGroup. Sometimes, we'd like granular control over what is passed into each model's operator.

Use case(s)

  • A few models that require a longer retry_delay than other models in the same DAG.
  • A few models that require more retries than average models.
  • Setting trigger_rule to ONE_SUCCESS for a particular model.

Potential Solution

Currently, we can achieve this by doing the following:

  1. Define airflow operator args in model's yml file under config (or config.meta)
version: 2
models:
  - name: model_a
    config:
      alias: model_a
      operator_args:
        retry_delay: 500
        retries: 10
  1. Leverage node_converters
from airflow.models import BaseOperator
from airflow.models.dag import DAG
from airflow.utils.task_group import TaskGroup
from cosmos.airflow.graph import (
    generate_task_or_group,
)
from cosmos.dbt.graph import DbtNode

def inject_operator_args(
    dag: DAG,
    task_group: TaskGroup,
    node: DbtNode,
    **kwargs,
) -> BaseOperator | TaskGroup | None:
    """
    Return a task or group after injecting operator args
    if present in a model/test config
    """
    try:
        operator_args = node.config["operator_args"]
        kwargs["task_args"] = kwargs["task_args"] | operator_args
    except KeyError:
        # skip injection if `operator_args` not available
        pass
    except Exception as e:
        raise
    finally:
        task = generate_task_or_group(
            dag=dag,
            task_group=task_group,
            node=node,
            **kwargs,
        )

    return task

Conclude

The above solution works. However, this feels like it should be a default behaviour of cosmos.

Copy link

dosubot bot commented Mar 7, 2024

That sounds like a great idea! Your proposed solution seems well thought out and could definitely add value to the project. We encourage you to go ahead and create a pull request with your changes. Your contribution would be much appreciated by the community. Thank you for your willingness to improve the project!

About Dosu

This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

@dosubot dosubot bot added area:config Related to configuration, like YAML files, environment variables, or executer configuration parsing:custom Related to custom parsing, like custom DAG parsing, custom DBT parsing, etc labels Mar 7, 2024
@dwreeves
Copy link
Collaborator

dwreeves commented Mar 9, 2024

If Astronomer were to commit to an API that seeps more into the Dbt schema (note, with your suggestion this would be the first time Cosmos directly dictates how dbt code is written), I actually think that placing all Cosmos configuration inside of config.cosmos rather than config.operator_args would be a safer bet. So something like this:

version: 2
models:
  - name: model_a
    config:
      alias: model_a
      cosmos:
        operator_args:
          retry_delay: 500
          retries: 10

It is not unreasonable to think of a world where Cosmos requires more keys than just operator_args, and keeping them nice and tidy inside a cosmos: mapping is probably the safer option.

With that slight change in the API I would fully endorse this feature. It comes up super often that you need to do stuff like this. (I actually have a related but separate issue I was intending on opening in the near future relating to providing easier access to customization of how Cosmos runs...)

@linchun3
Copy link
Contributor Author

linchun3 commented Mar 9, 2024

Hey @dwreeves ,

Thanks for reviewing this issue quickly.

I agree entirely with your proposed change in API 😀

  • There's a caveat to adding a new key that is not proposed under dbt's schema:

Most configurations are "clobbered" when applied hierarchically. Whenever a more specific value is available, it will completely replace the less specific value. Note that a few configs have different merge behavior:

See https://docs.getdbt.com/reference/configs-and-properties#combining-configs for details.

A user may expect the cosmos key/value(s) to be merged instead of overwritten entirely if they define it in both dbt_project.yml and model.yml.

We could either accept this as default behaviour - as with the default behaviour for keys not specified by dbt-core or make merge a default behaviour. WDYT?

Either ways, we could document this behaviour in the cosmos docs.

Should I make an attempt to contribute this feature with your proposed API change?

@dwreeves
Copy link
Collaborator

dwreeves commented Mar 9, 2024

I'm not a maintainer of this repo, so I won't be the right person to ask that. I was just putting in an API suggestion and a yes vote for the feature.

@tatiana
Copy link
Collaborator

tatiana commented Mar 12, 2024

@linchun3 this is an excellent proposal - including the suggestion by @dwreeves. We'd love to have this feature! Please, feel free to work on this!

@tatiana tatiana added this to the 1.5.0 milestone May 14, 2024
tatiana added a commit that referenced this issue May 15, 2024
[Daniel Reeves](https://www.linkedin.com/in/daniel-reeves-27700545/)
(@dwreeves ) is an experienced Open-Source Developer currently working
as a Data Architect at Battery Ventures. He has significant experience
with Apache Airflow, SQL, and Python and has contributed to many [OSS
projects](https://github.com/dwreeve).

Not only has he been using Cosmos since its early stages, but since
January 2023, he has actively contributed to the project:
![Screenshot 2024-05-14 at 10 47
30](https://github.com/astronomer/astronomer-cosmos/assets/272048/57829cb6-7eee-4b02-998b-46cc7746f15a)

He has been a critical driver for the Cosmos 1.4 release, and some of
his contributions include new features, bug fixes, and documentation
improvements, including:
* Creation of an Airflow plugin to render dbt docs:
#737
* Support using dbt partial parsing file:
#800
* Add more template fields to `DbtBaseOperator`:
#786
* Add cancel on kill functionality:
#101
* Make region optional in Snowflake profile mapping:
#100
* Fix the dbt docs operator to not look for `graph.pickle`:
#883

He thinks about the project long-term and proposes thorough solutions to
problems faced by the community, as can be seen in Github tickets:
* Introducing composability in the middle layer of Cosmos's API:
#895
* Establish a general pattern for uploading artifacts to storage:
#894
* Support `operator_arguments` injection at a node level:
#881

One of Daniel's notable traits is his collaborative and supportive
approach. He has actively engaged with users in the #airflow-dbt Slack
channel, demonstrating his commitment to fostering a supportive
community.

We want to promote him as a Cosmos committer and maintainer for all
these, recognising his constant efforts and achievements towards our
community. Thank you very much, @dwreeves !
@tatiana tatiana added the triage-needed Items need to be reviewed / assigned to milestone label May 17, 2024
@tatiana tatiana modified the milestones: 1.5.0, 1.6.0 May 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:config Related to configuration, like YAML files, environment variables, or executer configuration parsing:custom Related to custom parsing, like custom DAG parsing, custom DBT parsing, etc triage-needed Items need to be reviewed / assigned to milestone
Projects
None yet
Development

No branches or pull requests

3 participants