Skip to content

Commit

Permalink
Determine fail_stop on client side when db isolated
Browse files Browse the repository at this point in the history
This is needed because we do not ser the dag on Operator objects.

(cherry picked from commit 00ff95c27f68e1e1564b01dbd3fbc22207976ab7)
  • Loading branch information
dstandish committed Apr 25, 2024
1 parent c2ef1da commit 98343ee
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 2 deletions.
15 changes: 13 additions & 2 deletions airflow/models/taskinstance.py
Expand Up @@ -878,6 +878,7 @@ def _handle_failure(
test_mode: bool | None = None,
context: Context | None = None,
force_fail: bool = False,
fail_stop: bool = False,
) -> None:
"""
Handle Failure for a task instance.
Expand All @@ -901,6 +902,7 @@ def _handle_failure(
context=context,
force_fail=force_fail,
session=session,
fail_stop=fail_stop,
)

_log_state(task_instance=task_instance, lead_msg="Immediate failure requested. " if force_fail else "")
Expand Down Expand Up @@ -2961,9 +2963,14 @@ def fetch_handle_failure_context(
test_mode: bool | None = None,
context: Context | None = None,
force_fail: bool = False,
fail_stop: bool = False,
session: Session = NEW_SESSION,
):
"""Handle Failure for the TaskInstance."""
"""
Handle Failure for the TaskInstance.
:param fail_stop: if true, stop remaining tasks in dag
"""
get_listener_manager().hook.on_task_instance_failed(
previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error, session=session
)
Expand Down Expand Up @@ -3026,7 +3033,7 @@ def fetch_handle_failure_context(
email_for_state = operator.attrgetter("email_on_failure")
callbacks = task.on_failure_callback if task else None

if task and task.dag and task.dag.fail_stop:
if task and fail_stop:
_stop_remaining_tasks(task_instance=ti, session=session)
else:
if ti.state == TaskInstanceState.QUEUED:
Expand Down Expand Up @@ -3075,13 +3082,17 @@ def handle_failure(
:param context: Jinja2 context
:param force_fail: if True, task does not retry
"""
if TYPE_CHECKING:
assert self.task
assert self.task.dag
_handle_failure(
task_instance=self,
error=error,
session=session,
test_mode=test_mode,
context=context,
force_fail=force_fail,
fail_stop=self.task.dag.fail_stop,
)

def is_eligible_to_retry(self):
Expand Down
4 changes: 4 additions & 0 deletions airflow/serialization/pydantic/taskinstance.py
Expand Up @@ -276,13 +276,17 @@ def handle_failure(
"""
from airflow.models.taskinstance import _handle_failure

if TYPE_CHECKING:
assert self.task
assert self.task.dag
_handle_failure(
task_instance=self,
error=error,
session=session,
test_mode=test_mode,
context=context,
force_fail=force_fail,
fail_stop=self.task.dag.fail_stop,
)

def refresh_from_task(self, task: Operator, pool_override: str | None = None) -> None:
Expand Down

0 comments on commit 98343ee

Please sign in to comment.