Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling exception getting logs when pods finish success #39296

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 15 additions & 5 deletions airflow/providers/cncf/kubernetes/operators/pod.py
Expand Up @@ -36,6 +36,7 @@
import tenacity
from deprecated import deprecated
from kubernetes.client import CoreV1Api, V1Pod, models as k8s
from kubernetes.client.exceptions import ApiException
from kubernetes.stream import stream
from urllib3.exceptions import HTTPError

Expand Down Expand Up @@ -788,9 +789,18 @@ def _clean(self, event: dict[str, Any]):
# Skip await_pod_completion when the event is 'timeout' due to the pod can hang
# on the ErrImagePull or ContainerCreating step and it will never complete
if event["status"] != "timeout":
self.pod = self.pod_manager.await_pod_completion(
self.pod, istio_enabled, self.base_container_name
)
try:
self.pod = self.pod_manager.await_pod_completion(
self.pod, istio_enabled, self.base_container_name
)
except ApiException as e:
if e.status == 404:
self.pod = None
amoghrajesh marked this conversation as resolved.
Show resolved Hide resolved
self.log.warning(
"Pod not found while waiting for completion. The last status was %r", event["status"]
)
else:
raise e
if self.pod is not None:
self.post_complete_action(
pod=self.pod,
Expand Down Expand Up @@ -818,11 +828,11 @@ def write_logs(self, pod: k8s.V1Pod, follow: bool = False, since_time: DateTime
line = raw_line.decode("utf-8", errors="backslashreplace").rstrip("\n")
if line:
self.log.info("Container logs: %s", line)
except HTTPError as e:
except (HTTPError, ApiException) as e:
self.log.warning(
"Reading of logs interrupted with error %r; will retry. "
"Set log level to DEBUG for traceback.",
e,
e if not isinstance(e, ApiException) else e.reason,
)

def post_complete_action(self, *, pod, remote_pod, **kwargs):
Expand Down
26 changes: 24 additions & 2 deletions tests/providers/cncf/kubernetes/operators/test_pod.py
Expand Up @@ -25,7 +25,7 @@
import pendulum
import pytest
from kubernetes.client import ApiClient, V1Pod, V1PodSecurityContext, V1PodStatus, models as k8s
from kubernetes.client.rest import ApiException
from kubernetes.client.exceptions import ApiException
from urllib3 import HTTPResponse

from airflow.exceptions import AirflowException, AirflowSkipException, TaskDeferred
Expand Down Expand Up @@ -1765,7 +1765,7 @@ def run_pod_async(self, operator: KubernetesPodOperator, map_index: int = -1):
remote_pod_mock.metadata.namespace = TEST_NAMESPACE
self.await_pod_mock.return_value = remote_pod_mock

operator.execute_complete(
operator.trigger_reentry(
context=context,
event={
"status": "success",
Expand Down Expand Up @@ -2053,6 +2053,28 @@ def test_async_write_logs_should_execute_successfully(
else:
mock_manager.return_value.read_pod_logs.assert_not_called()

@pytest.mark.parametrize("evaluate_status", [404, None])
@patch(KUB_OP_PATH.format("post_complete_action"))
@patch(KUB_OP_PATH.format("extract_xcom"))
@patch(HOOK_CLASS)
@patch(KUB_OP_PATH.format("pod_manager"))
def test_async_write_logs_handler_api_exception(
self, mock_manager, mocked_hook, mock_extract_xcom, evaluate_status, post_complete_action
):
mock_manager.read_pod_logs.side_effect = ApiException(status=evaluate_status)
mock_manager.await_pod_completion.side_effect = ApiException(status=404)
mocked_hook.return_value.get_pod.return_value = k8s.V1Pod(
metadata=k8s.V1ObjectMeta(name=TEST_NAME, namespace=TEST_NAMESPACE)
)
mock_extract_xcom.return_value = "{}"
k = KubernetesPodOperator(
task_id="task",
get_logs=True,
deferrable=True,
)
self.run_pod_async(k)
post_complete_action.assert_not_called()

@pytest.mark.parametrize(
"log_pod_spec_on_failure,expect_match",
[
Expand Down