Skip to content

Commit

Permalink
Handling exception getting logs when pods finish success
Browse files Browse the repository at this point in the history
  • Loading branch information
acsierra committed Apr 28, 2024
1 parent 25f901a commit 8c0afca
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 6 deletions.
2 changes: 1 addition & 1 deletion airflow/providers/cncf/kubernetes/__init__.py
Expand Up @@ -27,7 +27,7 @@

__all__ = ["__version__"]

__version__ = "8.1.1"
__version__ = "8.1.2"

try:
from airflow import __version__ as airflow_version
Expand Down
17 changes: 12 additions & 5 deletions airflow/providers/cncf/kubernetes/operators/pod.py
Expand Up @@ -36,6 +36,7 @@
from deprecated import deprecated
from kubernetes.client import CoreV1Api, V1Pod, models as k8s
from kubernetes.stream import stream
from kubernetes.client.exceptions import ApiException
from urllib3.exceptions import HTTPError

from airflow.configuration import conf
Expand Down Expand Up @@ -766,9 +767,15 @@ def _clean(self, event: dict[str, Any]):
# Skip await_pod_completion when the event is 'timeout' due to the pod can hang
# on the ErrImagePull or ContainerCreating step and it will never complete
if event["status"] != "timeout":
self.pod = self.pod_manager.await_pod_completion(
self.pod, istio_enabled, self.base_container_name
)
try:
self.pod = self.pod_manager.await_pod_completion(
self.pod, istio_enabled, self.base_container_name
)
except ApiException as e:
if e.status == 404:
self.pod = None
else:
raise e
if self.pod is not None:
self.post_complete_action(
pod=self.pod,
Expand Down Expand Up @@ -796,11 +803,11 @@ def write_logs(self, pod: k8s.V1Pod, follow: bool = False, since_time: DateTime
line = raw_line.decode("utf-8", errors="backslashreplace").rstrip("\n")
if line:
self.log.info("Container logs: %s", line)
except HTTPError as e:
except (HTTPError, ApiException) as e:
self.log.warning(
"Reading of logs interrupted with error %r; will retry. "
"Set log level to DEBUG for traceback.",
e,
e if not isinstance(e, ApiException) else e.reason,
)

def post_complete_action(self, *, pod, remote_pod, **kwargs):
Expand Down

0 comments on commit 8c0afca

Please sign in to comment.