Skip to content

Commit

Permalink
Add retry logic for KubernetesCreateResourceOperator and KubernetesJo…
Browse files Browse the repository at this point in the history
…bOperator (apache#39201)
  • Loading branch information
MaksYermak authored and RodrigoGanancia committed May 10, 2024
1 parent ec93b77 commit edec829
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 1 deletion.
8 changes: 8 additions & 0 deletions airflow/providers/cncf/kubernetes/hooks/kubernetes.py
Expand Up @@ -25,6 +25,7 @@
from typing import TYPE_CHECKING, Any, Generator

import aiofiles
import tenacity
from asgiref.sync import sync_to_async
from kubernetes import client, config, watch
from kubernetes.config import ConfigException
Expand All @@ -35,6 +36,7 @@
from airflow.hooks.base import BaseHook
from airflow.models import Connection
from airflow.providers.cncf.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import should_retry_creation
from airflow.providers.cncf.kubernetes.utils.pod_manager import PodOperatorHookProtocol
from airflow.utils import yaml

Expand Down Expand Up @@ -486,6 +488,12 @@ def get_deployment_status(
except Exception as exc:
raise exc

@tenacity.retry(
stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_random_exponential(),
reraise=True,
retry=tenacity.retry_if_exception(should_retry_creation),
)
def create_job(
self,
job: V1Job,
Expand Down
16 changes: 16 additions & 0 deletions airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py
Expand Up @@ -23,6 +23,7 @@

import pendulum
from deprecated import deprecated
from kubernetes.client.rest import ApiException
from slugify import slugify

from airflow.compat.functools import cache
Expand Down Expand Up @@ -181,3 +182,18 @@ def annotations_for_logging_task_metadata(annotation_set):
else:
annotations_for_logging = "<omitted>"
return annotations_for_logging


def should_retry_creation(exception: BaseException) -> bool:
"""
Check if an Exception indicates a transient error and warrants retrying.
This function is needed for preventing 'No agent available' error. The error appears time to time
when users try to create a Resource or Job. This issue is inside kubernetes and in the current moment
has no solution. Like a temporary solution we decided to retry Job or Resource creation request each
time when this error appears.
More about this issue here: https://github.com/cert-manager/cert-manager/issues/6457
"""
if isinstance(exception, ApiException):
return str(exception.status) == "500"
return False
10 changes: 10 additions & 0 deletions airflow/providers/cncf/kubernetes/operators/resource.py
Expand Up @@ -22,12 +22,14 @@
from functools import cached_property
from typing import TYPE_CHECKING, Sequence

import tenacity
import yaml
from kubernetes.utils import create_from_yaml

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import should_retry_creation
from airflow.providers.cncf.kubernetes.utils.delete_from import delete_from_yaml
from airflow.providers.cncf.kubernetes.utils.k8s_resource_iterator import k8s_resource_iterator

Expand Down Expand Up @@ -126,7 +128,14 @@ def create_custom_from_yaml_object(self, body: dict):
else:
self.custom_object_client.create_cluster_custom_object(group, version, plural, body)

@tenacity.retry(
stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_random_exponential(),
reraise=True,
retry=tenacity.retry_if_exception(should_retry_creation),
)
def _create_objects(self, objects):
self.log.info("Starting resource creation")
if not self.custom_resource_definition:
create_from_yaml(
k8s_client=self.client,
Expand All @@ -144,6 +153,7 @@ def execute(self, context) -> None:
self._create_objects(yaml.safe_load_all(stream))
else:
raise AirflowException("File %s not found", self.yaml_conf_file)
self.log.info("Resource was created")


class KubernetesDeleteResourceOperator(KubernetesResourceBaseOperator):
Expand Down
39 changes: 39 additions & 0 deletions tests/providers/cncf/kubernetes/hooks/test_kubernetes.py
Expand Up @@ -26,6 +26,7 @@

import kubernetes
import pytest
from kubernetes.client.rest import ApiException
from kubernetes.config import ConfigException
from sqlalchemy.orm import make_transient

Expand Down Expand Up @@ -624,6 +625,44 @@ def test_wait_until_job_complete(self, mock_job_status, mock_kube_config_merger,
mock_sleep.assert_has_calls([mock.call(POLL_INTERVAL)] * 4)
assert job_actual == job_expected

@patch(f"{HOOK_MODULE}.json.dumps")
@patch(f"{HOOK_MODULE}.KubernetesHook.batch_v1_client")
def test_create_job_retries_on_500_error(self, mock_client, mock_json_dumps):
mock_client.create_namespaced_job.side_effect = [
ApiException(status=500),
MagicMock(),
]

hook = KubernetesHook()
hook.create_job(job=mock.MagicMock())

assert mock_client.create_namespaced_job.call_count == 2

@patch(f"{HOOK_MODULE}.json.dumps")
@patch(f"{HOOK_MODULE}.KubernetesHook.batch_v1_client")
def test_create_job_fails_on_other_exception(self, mock_client, mock_json_dumps):
mock_client.create_namespaced_job.side_effect = [ApiException(status=404)]

hook = KubernetesHook()
with pytest.raises(ApiException):
hook.create_job(job=mock.MagicMock())

@patch(f"{HOOK_MODULE}.json.dumps")
@patch(f"{HOOK_MODULE}.KubernetesHook.batch_v1_client")
def test_create_job_retries_three_times(self, mock_client, mock_json_dumps):
mock_client.create_namespaced_job.side_effect = [
ApiException(status=500),
ApiException(status=500),
ApiException(status=500),
ApiException(status=500),
]

hook = KubernetesHook()
with pytest.raises(ApiException):
hook.create_job(job=mock.MagicMock())

assert mock_client.create_namespaced_job.call_count == 3


class TestKubernetesHookIncorrectConfiguration:
@pytest.mark.parametrize(
Expand Down
61 changes: 60 additions & 1 deletion tests/providers/cncf/kubernetes/operators/test_resource.py
Expand Up @@ -16,10 +16,11 @@
# under the License.
from __future__ import annotations

from unittest.mock import patch
from unittest.mock import MagicMock, patch

import pytest
import yaml
from kubernetes.client.rest import ApiException

from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.resource import (
Expand Down Expand Up @@ -237,3 +238,61 @@ def test_delete_not_namespaced_custom_app_from_yaml(self, mock_delete_cluster_cu
"resourceflavors",
"default-flavor-test",
)

@patch("kubernetes.config.load_kube_config")
@patch("airflow.providers.cncf.kubernetes.operators.resource.create_from_yaml")
def test_create_objects_retries_on_500_error(self, mock_create_from_yaml, mock_load_kube_config, context):
mock_create_from_yaml.side_effect = [
ApiException(status=500),
MagicMock(),
]

op = KubernetesCreateResourceOperator(
yaml_conf=TEST_VALID_RESOURCE_YAML,
dag=self.dag,
kubernetes_conn_id="kubernetes_default",
task_id="test_task_id",
config_file="/foo/bar",
)
op.execute(context)

assert mock_create_from_yaml.call_count == 2

@patch("kubernetes.config.load_kube_config")
@patch("airflow.providers.cncf.kubernetes.operators.resource.create_from_yaml")
def test_create_objects_fails_on_other_exception(
self, mock_create_from_yaml, mock_load_kube_config, context
):
mock_create_from_yaml.side_effect = [ApiException(status=404)]

op = KubernetesCreateResourceOperator(
yaml_conf=TEST_VALID_RESOURCE_YAML,
dag=self.dag,
kubernetes_conn_id="kubernetes_default",
task_id="test_task_id",
config_file="/foo/bar",
)
with pytest.raises(ApiException):
op.execute(context)

@patch("kubernetes.config.load_kube_config")
@patch("airflow.providers.cncf.kubernetes.operators.resource.create_from_yaml")
def test_create_objects_retries_three_times(self, mock_create_from_yaml, mock_load_kube_config, context):
mock_create_from_yaml.side_effect = [
ApiException(status=500),
ApiException(status=500),
ApiException(status=500),
ApiException(status=500),
]

op = KubernetesCreateResourceOperator(
yaml_conf=TEST_VALID_RESOURCE_YAML,
dag=self.dag,
kubernetes_conn_id="kubernetes_default",
task_id="test_task_id",
config_file="/foo/bar",
)
with pytest.raises(ApiException):
op.execute(context)

assert mock_create_from_yaml.call_count == 3

0 comments on commit edec829

Please sign in to comment.