Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[stream/ws_client] Reading long stdout is truncated at 32768 chars #2226

Open
paramjeet01 opened this issue May 5, 2024 · 3 comments
Open
Labels
help wanted Denotes an issue that needs help from a contributor. Must meet "help wanted" guidelines. kind/bug Categorizes issue or PR as related to a bug.

Comments

@paramjeet01
Copy link

paramjeet01 commented May 5, 2024

We are facing this issue in airflow and think it's due to python kubernetes-client.
Earlier repo issue : kubernetes-client/python-base#190
Airflow issue : apache/airflow#39267
Error log :

[2024-04-26, 00:21:32 IST] {pod_manager.py:718} INFO - Checking if xcom sidecar container is started.
[2024-04-26, 00:21:32 IST] {pod_manager.py:721} INFO - The xcom sidecar container is started.
[2024-04-26, 00:21:32 IST] {pod_manager.py:798} INFO - Running command... if [ -s /airflow/xcom/return.json ]; then cat /airflow/xcom/return.json; else echo __airflow_xcom_result_empty__; fi
[2024-04-26, 00:21:36 IST] {pod_manager.py:798} INFO - Running command... if [ -s /airflow/xcom/return.json ]; then cat /airflow/xcom/return.json; else echo __airflow_xcom_result_empty__; fi
[2024-04-26, 00:21:40 IST] {pod_manager.py:798} INFO - Running command... if [ -s /airflow/xcom/return.json ]; then cat /airflow/xcom/return.json; else echo __airflow_xcom_result_empty__; fi
[2024-04-26, 00:21:44 IST] {pod_manager.py:798} INFO - Running command... if [ -s /airflow/xcom/return.json ]; then cat /airflow/xcom/return.json; else echo __airflow_xcom_result_empty__; fi
[2024-04-26, 00:21:52 IST] {pod_manager.py:798} INFO - Running command... if [ -s /airflow/xcom/return.json ]; then cat /airflow/xcom/return.json; else echo __airflow_xcom_result_empty__; fi
[2024-04-26, 00:21:52 IST] {pod_manager.py:798} INFO - Running command... kill -s SIGINT 1
[2024-04-26, 00:21:52 IST] {pod.py:909} INFO - Deleting pod: hps-mydata-generation-9lqygp1s
[2024-04-26, 00:21:52 IST] {taskinstance.py:2731} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/airflow/plugins/operators/kubernetes_pod_operator.py", line 200, in execute
    result = self.extract_xcom(pod=self.pod)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 557, in extract_xcom
    result = self.pod_manager.extract_xcom(pod)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 730, in extract_xcom
    result = self.extract_xcom_json(pod)
  File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 289, in wrapped_f
    return self(f, *args, **kw)
  File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 379, in __call__
    do = self.iter(retry_state=retry_state)
  File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 325, in iter
    raise retry_exc.reraise()
  File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 158, in reraise
    raise self.last_attempt.result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 382, in __call__
    result = fn(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py", line 765, in extract_xcom_json
    json.loads(result)
  File "/usr/local/lib/python3.10/json/__init__.py", line 346, in loads
    return _default_decoder.decode(s)
  File "/usr/local/lib/python3.10/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/local/lib/python3.10/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 16385 (char 16384)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 439, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 414, in _execute_callable
    return execute_callable(context=context, **execute_callable_kwargs)
  File "/opt/airflow/plugins/operators/kubernetes_pod_operator.py", line 215, in execute
    self.cleanup(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 839, in cleanup
    raise AirflowException
@paramjeet01 paramjeet01 added the kind/bug Categorizes issue or PR as related to a bug. label May 5, 2024
@roycaihw
Copy link
Member

roycaihw commented May 8, 2024

/help

It is unclear where the limitation is coming from. Whether the limitation is coming from this client or some underlying library

@k8s-ci-robot
Copy link
Contributor

@roycaihw:
This request has been marked as needing help from a contributor.

Guidelines

Please ensure that the issue body includes answers to the following questions:

  • Why are we solving this issue?
  • To address this issue, are there any code changes? If there are code changes, what needs to be done in the code and what places can the assignee treat as reference points?
  • Does this issue have zero to low barrier of entry?
  • How can the assignee reach out to you for help?

For more details on the requirements of such an issue, please see here and ensure that they are met.

If this request no longer meets these requirements, the label can be removed
by commenting with the /remove-help command.

In response to this:

/help

It is unclear where the limitation is coming from. Whether the limitation is coming from this client or some underlying library

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@k8s-ci-robot k8s-ci-robot added the help wanted Denotes an issue that needs help from a contributor. Must meet "help wanted" guidelines. label May 8, 2024
@paramjeet01
Copy link
Author

I'm not sure where the data is lost in between but I'm trying this code as suggested by @Paul424 in this issue :
kubernetes-client/python-base#190. My understanding is that the data is lost while reading from the cat command,

def _extract_xcom(self, pod: V1Pod):
    try:
        self.log.info(f'Running command... cat {PodDefaults.XCOM_MOUNT_PATH}/return.json')

        # Can't use _preload_content=True because that would return str(json.load(response))
        client = kubernetes_stream(
            self._client.connect_get_namespaced_pod_exec,
            pod.metadata.name,
            pod.metadata.namespace,
            container=PodDefaults.SIDECAR_CONTAINER_NAME,
            command=[
                '/bin/sh',
                '-c',
                f'cat {PodDefaults.XCOM_MOUNT_PATH}/return.json',
            ],
            stderr=True,
            stdin=False,
            stdout=True,
            tty=False,
            _preload_content=False,
            _request_timeout=10,
        )
        client.run_forever(timeout=10)
        resp = client.read_all()
        self.log.info("Received {} ({}) ({} ... {}))".format(type(resp), len(resp), resp[:64], resp[-64:]))

        # validate it's valid json
        _ = json.loads(resp)

        # Terminate the sidecar
        _ = kubernetes_stream(
            self._client.connect_get_namespaced_pod_exec,
            pod.metadata.name,
            pod.metadata.namespace,
            container=PodDefaults.SIDECAR_CONTAINER_NAME,
            command=[
                '/bin/sh',
                '-c',
                'kill -s SIGINT 1',
            ],
            stderr=True,
            stdin=False,
            stdout=True,
            tty=False,
            _preload_content=True,
            _request_timeout=10,
        )

        return resp

    except JSONDecodeError:
        message = f'Failed to decode json document from pod: {pod.metadata.name}'
        self.log.exception(message)
        raise AirflowException(message)

    except Exception as e:
        message = f'Failed to extract xcom from pod: {pod.metadata.name}'
        self.log.exception(message)
        raise AirflowException(message)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted Denotes an issue that needs help from a contributor. Must meet "help wanted" guidelines. kind/bug Categorizes issue or PR as related to a bug.
Projects
None yet
Development

No branches or pull requests

3 participants