Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retry logic for KubernetesCreateResourceOperator and KubernetesJobOperator #39201

Merged
merged 1 commit into from May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions airflow/providers/cncf/kubernetes/hooks/kubernetes.py
Expand Up @@ -25,6 +25,7 @@
from typing import TYPE_CHECKING, Any, Generator

import aiofiles
import tenacity
from asgiref.sync import sync_to_async
from kubernetes import client, config, watch
from kubernetes.config import ConfigException
Expand All @@ -35,6 +36,7 @@
from airflow.hooks.base import BaseHook
from airflow.models import Connection
from airflow.providers.cncf.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import should_retry_creation
from airflow.providers.cncf.kubernetes.utils.pod_manager import PodOperatorHookProtocol
from airflow.utils import yaml

Expand Down Expand Up @@ -486,6 +488,12 @@ def get_deployment_status(
except Exception as exc:
raise exc

@tenacity.retry(
potiuk marked this conversation as resolved.
Show resolved Hide resolved
stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_random_exponential(),
reraise=True,
retry=tenacity.retry_if_exception(should_retry_creation),
)
def create_job(
self,
job: V1Job,
Expand Down
Expand Up @@ -23,6 +23,7 @@

import pendulum
from deprecated import deprecated
from kubernetes.client.rest import ApiException
from slugify import slugify

from airflow.compat.functools import cache
Expand Down Expand Up @@ -181,3 +182,18 @@ def annotations_for_logging_task_metadata(annotation_set):
else:
annotations_for_logging = "<omitted>"
return annotations_for_logging


def should_retry_creation(exception: BaseException) -> bool:
"""
Check if an Exception indicates a transient error and warrants retrying.

This function is needed for preventing 'No agent available' error. The error appears time to time
when users try to create a Resource or Job. This issue is inside kubernetes and in the current moment
has no solution. Like a temporary solution we decided to retry Job or Resource creation request each
time when this error appears.
More about this issue here: https://github.com/cert-manager/cert-manager/issues/6457
"""
if isinstance(exception, ApiException):
return str(exception.status) == "500"
return False
10 changes: 10 additions & 0 deletions airflow/providers/cncf/kubernetes/operators/resource.py
Expand Up @@ -22,12 +22,14 @@
from functools import cached_property
from typing import TYPE_CHECKING, Sequence

import tenacity
import yaml
from kubernetes.utils import create_from_yaml

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import should_retry_creation
from airflow.providers.cncf.kubernetes.utils.delete_from import delete_from_yaml
from airflow.providers.cncf.kubernetes.utils.k8s_resource_iterator import k8s_resource_iterator

Expand Down Expand Up @@ -126,7 +128,14 @@ def create_custom_from_yaml_object(self, body: dict):
else:
self.custom_object_client.create_cluster_custom_object(group, version, plural, body)

@tenacity.retry(
stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_random_exponential(),
reraise=True,
retry=tenacity.retry_if_exception(should_retry_creation),
)
def _create_objects(self, objects):
self.log.info("Starting resource creation")
if not self.custom_resource_definition:
create_from_yaml(
k8s_client=self.client,
Expand All @@ -144,6 +153,7 @@ def execute(self, context) -> None:
self._create_objects(yaml.safe_load_all(stream))
else:
raise AirflowException("File %s not found", self.yaml_conf_file)
self.log.info("Resource was created")


class KubernetesDeleteResourceOperator(KubernetesResourceBaseOperator):
Expand Down
39 changes: 39 additions & 0 deletions tests/providers/cncf/kubernetes/hooks/test_kubernetes.py
Expand Up @@ -26,6 +26,7 @@

import kubernetes
import pytest
from kubernetes.client.rest import ApiException
from kubernetes.config import ConfigException
from sqlalchemy.orm import make_transient

Expand Down Expand Up @@ -624,6 +625,44 @@ def test_wait_until_job_complete(self, mock_job_status, mock_kube_config_merger,
mock_sleep.assert_has_calls([mock.call(POLL_INTERVAL)] * 4)
assert job_actual == job_expected

@patch(f"{HOOK_MODULE}.json.dumps")
@patch(f"{HOOK_MODULE}.KubernetesHook.batch_v1_client")
def test_create_job_retries_on_500_error(self, mock_client, mock_json_dumps):
mock_client.create_namespaced_job.side_effect = [
ApiException(status=500),
MagicMock(),
]

hook = KubernetesHook()
hook.create_job(job=mock.MagicMock())

assert mock_client.create_namespaced_job.call_count == 2

@patch(f"{HOOK_MODULE}.json.dumps")
@patch(f"{HOOK_MODULE}.KubernetesHook.batch_v1_client")
def test_create_job_fails_on_other_exception(self, mock_client, mock_json_dumps):
mock_client.create_namespaced_job.side_effect = [ApiException(status=404)]

hook = KubernetesHook()
with pytest.raises(ApiException):
hook.create_job(job=mock.MagicMock())

@patch(f"{HOOK_MODULE}.json.dumps")
@patch(f"{HOOK_MODULE}.KubernetesHook.batch_v1_client")
def test_create_job_retries_three_times(self, mock_client, mock_json_dumps):
mock_client.create_namespaced_job.side_effect = [
ApiException(status=500),
ApiException(status=500),
ApiException(status=500),
ApiException(status=500),
]

hook = KubernetesHook()
with pytest.raises(ApiException):
hook.create_job(job=mock.MagicMock())

assert mock_client.create_namespaced_job.call_count == 3


class TestKubernetesHookIncorrectConfiguration:
@pytest.mark.parametrize(
Expand Down
61 changes: 60 additions & 1 deletion tests/providers/cncf/kubernetes/operators/test_resource.py
Expand Up @@ -16,10 +16,11 @@
# under the License.
from __future__ import annotations

from unittest.mock import patch
from unittest.mock import MagicMock, patch

import pytest
import yaml
from kubernetes.client.rest import ApiException

from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.resource import (
Expand Down Expand Up @@ -237,3 +238,61 @@ def test_delete_not_namespaced_custom_app_from_yaml(self, mock_delete_cluster_cu
"resourceflavors",
"default-flavor-test",
)

@patch("kubernetes.config.load_kube_config")
@patch("airflow.providers.cncf.kubernetes.operators.resource.create_from_yaml")
def test_create_objects_retries_on_500_error(self, mock_create_from_yaml, mock_load_kube_config, context):
mock_create_from_yaml.side_effect = [
ApiException(status=500),
MagicMock(),
]

op = KubernetesCreateResourceOperator(
yaml_conf=TEST_VALID_RESOURCE_YAML,
dag=self.dag,
kubernetes_conn_id="kubernetes_default",
task_id="test_task_id",
config_file="/foo/bar",
)
op.execute(context)

assert mock_create_from_yaml.call_count == 2

@patch("kubernetes.config.load_kube_config")
@patch("airflow.providers.cncf.kubernetes.operators.resource.create_from_yaml")
def test_create_objects_fails_on_other_exception(
self, mock_create_from_yaml, mock_load_kube_config, context
):
mock_create_from_yaml.side_effect = [ApiException(status=404)]

op = KubernetesCreateResourceOperator(
yaml_conf=TEST_VALID_RESOURCE_YAML,
dag=self.dag,
kubernetes_conn_id="kubernetes_default",
task_id="test_task_id",
config_file="/foo/bar",
)
with pytest.raises(ApiException):
op.execute(context)

@patch("kubernetes.config.load_kube_config")
@patch("airflow.providers.cncf.kubernetes.operators.resource.create_from_yaml")
def test_create_objects_retries_three_times(self, mock_create_from_yaml, mock_load_kube_config, context):
mock_create_from_yaml.side_effect = [
ApiException(status=500),
ApiException(status=500),
ApiException(status=500),
ApiException(status=500),
]

op = KubernetesCreateResourceOperator(
yaml_conf=TEST_VALID_RESOURCE_YAML,
dag=self.dag,
kubernetes_conn_id="kubernetes_default",
task_id="test_task_id",
config_file="/foo/bar",
)
with pytest.raises(ApiException):
op.execute(context)

assert mock_create_from_yaml.call_count == 3