Skip to content

Commit

Permalink
Use ProcessPoolExecutor over ThreadPoolExecutor.
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>

Make `max_workers` configurable.

Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>
  • Loading branch information
JDarDagran committed May 7, 2024
1 parent 7550a11 commit 7670638
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 2 deletions.
6 changes: 6 additions & 0 deletions airflow/providers/openlineage/conf.py
Expand Up @@ -101,3 +101,9 @@ def _is_true(val):
# Check if both 'transport' and 'config_path' are not present and also
# if legacy 'OPENLINEAGE_URL' environment variables is not set
return transport() == {} and config_path(True) == "" and os.getenv("OPENLINEAGE_URL", "") == ""

@cache
def dag_state_change_process_pool_size() -> int:
"""[openlineage] dag_state_change_process_pool_size."""
option = conf.getint(_CONFIG_SECTION, "dag_state_change_process_pool_size", fallback=1)
return option
5 changes: 3 additions & 2 deletions airflow/providers/openlineage/plugins/listener.py
Expand Up @@ -17,13 +17,14 @@
from __future__ import annotations

import logging
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor
from datetime import datetime
from typing import TYPE_CHECKING

from openlineage.client.serde import Serde

from airflow.listeners import hookimpl
from airflow.providers.openlineage import conf
from airflow.providers.openlineage.extractors import ExtractorManager
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter, RunState
from airflow.providers.openlineage.utils.utils import (
Expand Down Expand Up @@ -250,7 +251,7 @@ def on_failure():
@property
def executor(self):
if not self._executor:
self._executor = ThreadPoolExecutor(max_workers=8, thread_name_prefix="openlineage_")
self._executor = ProcessPoolExecutor(max_workers=conf.dag_state_change_process_pool_size())
return self._executor

@hookimpl
Expand Down
21 changes: 21 additions & 0 deletions tests/providers/openlineage/plugins/test_listener.py
Expand Up @@ -522,6 +522,27 @@ def test_listener_on_task_instance_success_do_not_call_adapter_when_disabled_ope
listener.extractor_manager.extract_metadata.assert_not_called()
listener.adapter.complete_task.assert_not_called()

@pytest.mark.parametrize(
"max_workers,expected",
[
(None, 1),
("8", 8),
],
)
@mock.patch("airflow.providers.openlineage.plugins.listener.ProcessPoolExecutor", autospec=True)
def test_listener_on_dag_run_state_changes_configure_process_pool_size(mock_executor, max_workers, expected):
"""mock ProcessPoolExecutor and check if conf.dag_state_change_process_pool_size is applied to max_workers"""
listener = OpenLineageListener()
# mock ProcessPoolExecutor class
try:
with conf_vars({("openlineage", "dag_state_change_process_pool_size"): max_workers}):
listener.on_dag_run_running(mock.MagicMock(), None)
mock_executor.assert_called_once_with(max_workers=expected)
mock_executor.return_value.submit.assert_called_once()
finally:
conf.dag_state_change_process_pool_size.cache_clear()



class TestOpenLineageSelectiveEnable:
def setup_method(self):
Expand Down

0 comments on commit 7670638

Please sign in to comment.