Skip to content

Commit

Permalink
Add retry logic for KubernetesCreateResourceOperator and KubernetesJo…
Browse files Browse the repository at this point in the history
…bOperator
  • Loading branch information
MaksYermak committed Apr 23, 2024
1 parent a6f612d commit f265b2a
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 0 deletions.
8 changes: 8 additions & 0 deletions airflow/providers/cncf/kubernetes/hooks/kubernetes.py
Expand Up @@ -25,6 +25,7 @@
from typing import TYPE_CHECKING, Any, Generator

import aiofiles
import tenacity
from asgiref.sync import sync_to_async
from kubernetes import client, config, watch
from kubernetes.config import ConfigException
Expand All @@ -35,6 +36,7 @@
from airflow.hooks.base import BaseHook
from airflow.models import Connection
from airflow.providers.cncf.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import should_retry_creation
from airflow.providers.cncf.kubernetes.utils.pod_manager import PodOperatorHookProtocol
from airflow.utils import yaml

Expand Down Expand Up @@ -486,6 +488,12 @@ def get_deployment_status(
except Exception as exc:
raise exc

@tenacity.retry(
stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_random_exponential(),
reraise=True,
retry=tenacity.retry_if_exception(should_retry_creation),
)
def create_job(
self,
job: V1Job,
Expand Down
16 changes: 16 additions & 0 deletions airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py
Expand Up @@ -23,6 +23,7 @@
from typing import TYPE_CHECKING

import pendulum
from kubernetes.client.rest import ApiException
from slugify import slugify

from airflow.compat.functools import cache
Expand Down Expand Up @@ -199,3 +200,18 @@ def annotations_for_logging_task_metadata(annotation_set):
else:
annotations_for_logging = "<omitted>"
return annotations_for_logging


def should_retry_creation(exception: BaseException) -> bool:
"""
Check if an Exception indicates a transient error and warrants retrying.
This function is needed for preventing 'No agent available' error. The error appears time to time
when users try to create a Resource or Job. This issue is inside kubernetes and in the current moment
has no solution. Like a temporary solution we decided to retry Job or Resource creation request each
time when this error appears.
More about this issue here: https://github.com/cert-manager/cert-manager/issues/6457
"""
if isinstance(exception, ApiException):
return str(exception.status) == "500"
return False
10 changes: 10 additions & 0 deletions airflow/providers/cncf/kubernetes/operators/resource.py
Expand Up @@ -22,12 +22,14 @@
from functools import cached_property
from typing import TYPE_CHECKING, Sequence

import tenacity
import yaml
from kubernetes.utils import create_from_yaml

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import should_retry_creation
from airflow.providers.cncf.kubernetes.utils.delete_from import delete_from_yaml
from airflow.providers.cncf.kubernetes.utils.k8s_resource_iterator import k8s_resource_iterator

Expand Down Expand Up @@ -126,7 +128,14 @@ def create_custom_from_yaml_object(self, body: dict):
else:
self.custom_object_client.create_cluster_custom_object(group, version, plural, body)

@tenacity.retry(
stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_random_exponential(),
reraise=True,
retry=tenacity.retry_if_exception(should_retry_creation),
)
def _create_objects(self, objects):
self.log.info("Starting resource creation")
if not self.custom_resource_definition:
create_from_yaml(
k8s_client=self.client,
Expand All @@ -144,6 +153,7 @@ def execute(self, context) -> None:
self._create_objects(yaml.safe_load_all(stream))
else:
raise AirflowException("File %s not found", self.yaml_conf_file)
self.log.info("Resource was created")


class KubernetesDeleteResourceOperator(KubernetesResourceBaseOperator):
Expand Down

0 comments on commit f265b2a

Please sign in to comment.