Skip to content

Commit

Permalink
Refactor cloudpickle support in Python operators/decorators (apache#3…
Browse files Browse the repository at this point in the history
…9270)

* Refactor cloudpickle support in Python operators/decorators

* Fixup missing marker

* Return back skip TestPythonVirtualenvOperator::test_airflow_context for dill

* TestPythonVirtualenvOperator::test_airflow_context xfail instead of skip

* Catch only on ModuleNotFound error and simple reraise with warning

* Limit test_airflow_context only for python 3.11
  • Loading branch information
Taragolis authored and RodrigoGanancia committed May 10, 2024
1 parent fda4bb6 commit c7f7661
Show file tree
Hide file tree
Showing 7 changed files with 566 additions and 513 deletions.
61 changes: 48 additions & 13 deletions airflow/decorators/__init__.pyi
Expand Up @@ -111,14 +111,15 @@ class TaskDecoratorCollection:
# _PythonVirtualenvDecoratedOperator.
requirements: None | Iterable[str] | str = None,
python_version: None | str | int | float = None,
use_dill: bool = False,
serializer: Literal["pickle", "cloudpickle", "dill"] | None = None,
system_site_packages: bool = True,
templates_dict: Mapping[str, Any] | None = None,
pip_install_options: list[str] | None = None,
skip_on_exit_code: int | Container[int] | None = None,
index_urls: None | Collection[str] | str = None,
venv_cache_path: None | str = None,
show_return_value_in_logs: bool = True,
use_dill: bool = False,
**kwargs,
) -> TaskDecorator:
"""Create a decorator to convert the decorated callable to a virtual environment task.
Expand All @@ -129,6 +130,13 @@ class TaskDecoratorCollection:
"requirements file" as specified by pip.
:param python_version: The Python version to run the virtual environment with. Note that
both 2 and 2.7 are acceptable forms.
:param serializer: Which serializer use to serialize the args and result. It can be one of the following:
- ``"pickle"``: (default) Use pickle for serialization. Included in the Python Standard Library.
- ``"cloudpickle"``: Use cloudpickle for serialize more complex types,
this requires to include cloudpickle in your requirements.
- ``"dill"``: Use dill for serialize more complex types,
this requires to include dill in your requirements.
:param use_dill: Whether to use dill to serialize
the args and result (pickle is default). This allow more complex types
but requires you to include dill in your requirements.
Expand All @@ -154,6 +162,9 @@ class TaskDecoratorCollection:
logs. Defaults to True, which allows return value log output.
It can be set to False to prevent log output of return value when you return huge data
such as transmission a large amount of XCom to TaskAPI.
:param use_dill: Deprecated, use ``serializer`` instead. Whether to use dill to serialize
the args and result (pickle is default). This allows more complex types
but requires you to include dill in your requirements.
"""
@overload
def virtualenv(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ...
Expand All @@ -164,9 +175,10 @@ class TaskDecoratorCollection:
multiple_outputs: bool | None = None,
# 'python_callable', 'op_args' and 'op_kwargs' since they are filled by
# _PythonVirtualenvDecoratedOperator.
use_dill: bool = False,
serializer: Literal["pickle", "cloudpickle", "dill"] | None = None,
templates_dict: Mapping[str, Any] | None = None,
show_return_value_in_logs: bool = True,
use_dill: bool = False,
**kwargs,
) -> TaskDecorator:
"""Create a decorator to convert the decorated callable to a virtual environment task.
Expand All @@ -176,9 +188,13 @@ class TaskDecoratorCollection:
(so usually start with "/" or "X:/" depending on the filesystem/os used).
:param multiple_outputs: If set, function return value will be unrolled to multiple XCom values.
Dict will unroll to XCom values with keys as XCom keys. Defaults to False.
:param use_dill: Whether to use dill to serialize
the args and result (pickle is default). This allow more complex types
but requires you to include dill in your requirements.
:param serializer: Which serializer use to serialize the args and result. It can be one of the following:
- ``"pickle"``: (default) Use pickle for serialization. Included in the Python Standard Library.
- ``"cloudpickle"``: Use cloudpickle for serialize more complex types,
this requires to include cloudpickle in your requirements.
- ``"dill"``: Use dill for serialize more complex types,
this requires to include dill in your requirements.
:param templates_dict: a dictionary where the values are templates that
will get templated by the Airflow engine sometime between
``__init__`` and ``execute`` takes place and are made available
Expand All @@ -187,6 +203,9 @@ class TaskDecoratorCollection:
logs. Defaults to True, which allows return value log output.
It can be set to False to prevent log output of return value when you return huge data
such as transmission a large amount of XCom to TaskAPI.
:param use_dill: Deprecated, use ``serializer`` instead. Whether to use dill to serialize
the args and result (pickle is default). This allows more complex types
but requires you to include dill in your requirements.
"""
@overload
def branch( # type: ignore[misc]
Expand All @@ -211,14 +230,15 @@ class TaskDecoratorCollection:
# _PythonVirtualenvDecoratedOperator.
requirements: None | Iterable[str] | str = None,
python_version: None | str | int | float = None,
use_dill: bool = False,
serializer: Literal["pickle", "cloudpickle", "dill"] | None = None,
system_site_packages: bool = True,
templates_dict: Mapping[str, Any] | None = None,
pip_install_options: list[str] | None = None,
skip_on_exit_code: int | Container[int] | None = None,
index_urls: None | Collection[str] | str = None,
venv_cache_path: None | str = None,
show_return_value_in_logs: bool = True,
use_dill: bool = False,
**kwargs,
) -> TaskDecorator:
"""Create a decorator to wrap the decorated callable into a BranchPythonVirtualenvOperator.
Expand All @@ -232,9 +252,13 @@ class TaskDecoratorCollection:
"requirements file" as specified by pip.
:param python_version: The Python version to run the virtual environment with. Note that
both 2 and 2.7 are acceptable forms.
:param use_dill: Whether to use dill to serialize
the args and result (pickle is default). This allow more complex types
but requires you to include dill in your requirements.
:param serializer: Which serializer use to serialize the args and result. It can be one of the following:
- ``"pickle"``: (default) Use pickle for serialization. Included in the Python Standard Library.
- ``"cloudpickle"``: Use cloudpickle for serialize more complex types,
this requires to include cloudpickle in your requirements.
- ``"dill"``: Use dill for serialize more complex types,
this requires to include dill in your requirements.
:param system_site_packages: Whether to include
system_site_packages in your virtual environment.
See virtualenv documentation for more information.
Expand All @@ -253,6 +277,9 @@ class TaskDecoratorCollection:
logs. Defaults to True, which allows return value log output.
It can be set to False to prevent log output of return value when you return huge data
such as transmission a large amount of XCom to TaskAPI.
:param use_dill: Deprecated, use ``serializer`` instead. Whether to use dill to serialize
the args and result (pickle is default). This allows more complex types
but requires you to include dill in your requirements.
"""
@overload
def branch_virtualenv(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ...
Expand All @@ -264,9 +291,10 @@ class TaskDecoratorCollection:
multiple_outputs: bool | None = None,
# 'python_callable', 'op_args' and 'op_kwargs' since they are filled by
# _PythonVirtualenvDecoratedOperator.
use_dill: bool = False,
serializer: Literal["pickle", "cloudpickle", "dill"] | None = None,
templates_dict: Mapping[str, Any] | None = None,
show_return_value_in_logs: bool = True,
use_dill: bool = False,
**kwargs,
) -> TaskDecorator:
"""Create a decorator to wrap the decorated callable into a BranchExternalPythonOperator.
Expand All @@ -279,9 +307,13 @@ class TaskDecoratorCollection:
(so usually start with "/" or "X:/" depending on the filesystem/os used).
:param multiple_outputs: If set, function return value will be unrolled to multiple XCom values.
Dict will unroll to XCom values with keys as XCom keys. Defaults to False.
:param use_dill: Whether to use dill to serialize
the args and result (pickle is default). This allow more complex types
but requires you to include dill in your requirements.
:param serializer: Which serializer use to serialize the args and result. It can be one of the following:
- ``"pickle"``: (default) Use pickle for serialization. Included in the Python Standard Library.
- ``"cloudpickle"``: Use cloudpickle for serialize more complex types,
this requires to include cloudpickle in your requirements.
- ``"dill"``: Use dill for serialize more complex types,
this requires to include dill in your requirements.
:param templates_dict: a dictionary where the values are templates that
will get templated by the Airflow engine sometime between
``__init__`` and ``execute`` takes place and are made available
Expand All @@ -290,6 +322,9 @@ class TaskDecoratorCollection:
logs. Defaults to True, which allows return value log output.
It can be set to False to prevent log output of return value when you return huge data
such as transmission a large amount of XCom to TaskAPI.
:param use_dill: Deprecated, use ``serializer`` instead. Whether to use dill to serialize
the args and result (pickle is default). This allows more complex types
but requires you to include dill in your requirements.
"""
@overload
def branch_external_python(
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/tutorial_taskflow_api_virtualenv.py
Expand Up @@ -38,7 +38,7 @@ def tutorial_taskflow_api_virtualenv():
"""

@task.virtualenv(
use_dill=True,
serializer="dill", # Use `dill` for advanced serialization.
system_site_packages=False,
requirements=["funcsigs"],
)
Expand Down

0 comments on commit c7f7661

Please sign in to comment.