Replies: 6 comments 8 replies
-
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval. |
Beta Was this translation helpful? Give feedback.
-
I thnk you need to be much more detailed in explaining what you want - I have difficulty to connect first part of your request (ALL_SUCCESS) With the other (one-by-one) - those two have (IMHO) nothing to do with each other. Converted it into discussion but really if you want help with something it should start with properly explaining (examples, images, dags) what you have problem with - longer than one paragraph - it's very difficult to read mind of the person and there is not enough context to figure out what it is about. |
Beta Was this translation helpful? Give feedback.
-
@potiuk sure, I can give more details about my use case. So, I want to create a cluster and run a few jobs there. Something like this: chain(create_cluster, job1, job2, job3, teardown_cluster.as_teardown(setups=[create_cluster])) With the current behavior, if the first job fails, other jobs will not be started. But I want them to be started, even if the previous job failed. And at the same time, I don't want them to be submitted at once, so I can't do this: chain(create_cluster, [job1, job2, job3], teardown_cluster.as_teardown(setups=[create_cluster])) Theoretically, what I really need is the ability to configure each edge of a graph with different rules. In my particular case, each job should be submitted only if create_cluster.on_success(job1)
create_cluster.on_success(job2)
create_cluster.on_success(job3)
job1.on_complete(job2)
job2.on_complete(job3)
teardown_cluster.as_teardown(setups=[create_cluster]) As far as I know, Airflow doesn't provide this kind of functionality. And I can assume it will be a major feature. So, I'm trying to find some solution that will at least partially emulate this behavior. I ended up with this solution (simplified code): job1 = operator(rule = TriggerRule.ALL_DONE)
job2 = operator(rule = TriggerRule.ALL_DONE)
job3 = operator(rule = TriggerRule.ALL_DONE)
chain(create_cluster, job1, job2, job3, teardown_cluster.as_teardown(setups=[create_cluster])) With this code I expected that even if the previous job failed, the next job would still be submitted. Instead of it I saw this One possible solution here for me could be to not use the setup/teardown feature. But I don't want to follow this path, because it will create a lot of complexity in my DAGs. BTW, I saw this comment and assumed that this use case is kinda expected, but currently not implemented. |
Beta Was this translation helpful? Give feedback.
-
Any updates here? As a temporary solution locally I patched class FixedDAG(DAG):
def validate_setup_teardown(self):
for task in self.tasks:
if task.is_setup:
for down_task in task.downstream_list:
if not down_task.is_teardown and down_task.trigger_rule not in (
TriggerRule.ALL_SUCCESS, TriggerRule.ALL_DONE
):
raise ValueError("Setup tasks must be followed with trigger rule ALL_SUCCESS or ALL_DONE.")
FailStopDagInvalidTriggerRule.check(dag=self, trigger_rule=task.trigger_rule) And it seems to be working fine. |
Beta Was this translation helpful? Give feedback.
-
Hi y'all. So the reason that we added that constraint was, IIRC, that if you didn't have that constraint you could get into odd scenarios when clearing tasks. let me try to remember. so suppose you had s1 >> w1 >> w2 >> w3 >> t1.as_teardown(s1) then suppose now that w1 is "all failed". so then w1 runs and succeeds and then w2 and w3 run (both all_success) and t1 does not run (since setup not successful) now suppose that you clear w2 (downstream). this will clear s1 to rerun (since it's a setup for w2). now suppose s1 succeeds this time. then, according to the trigger rules, w1 should not run, since its trigger rule is all_failed. but it hasn't been cleared, so the trigger rule isn't considered since it's not up for a run -- it just stays in success state. and since w1 stays in success state, then w2 will run -- even though ordinarily it would not run if the setup was successful. so this results in inconsistent / contradictory dependency constraint behavior. we avoid this result by requiring that the trigger rule following a setup is successful. what do you think? i tried looking at your explanation but didn't understand. i think it might be easier if you explain the real world use case more conversationally than with pseudocode. could you try? why do you want something following a setup to run if the setup fails? the idea of setup is that ... you're setting something up for other things to use... p.s. i agree with you re being able to configure edges -- that approach makes more sense to me too but that's not how airflow is |
Beta Was this translation helpful? Give feedback.
-
@dstandish I don't think that you fully get my scenario.
I've never asked about this scenario. In your terms, I was talking about the next scenario:
And with this pipeline I want to achieve the next: if the |
Beta Was this translation helpful? Give feedback.
-
Description
I found that for tasks within setup/teardown block, you can use only
ALL_SUCCESS
trigger rule. It feels very limiting. For example, I have a use case when I submit a bunch of jobs to an AWS EMR cluster. Jobs are independent, but I want to submit them one by one to not overflow the cluster. So currently I can't implement this kind of logic together with setup/teardown.Use case/motivation
No response
Related issues
No response
Are you willing to submit a PR?
Code of Conduct
Beta Was this translation helpful? Give feedback.
All reactions