Skip to content

Commit

Permalink
fix(engine): Don't use cloudpickle + process pool for integration
Browse files Browse the repository at this point in the history
  • Loading branch information
daryllimyt committed May 9, 2024
1 parent 3faf424 commit b6b72f6
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 5 deletions.
2 changes: 2 additions & 0 deletions tracecat/concurrency.py
Expand Up @@ -12,6 +12,8 @@

logger = Logger("executor.cloudpickle")

# TODO: Do something about these functions


def _run_serialized_fn(serialized_wrapped_fn: bytes, role: Role, /, *args, **kwargs):
# NOTE: This is not the raw function - it is still wrapped by the `wrapper` decorator
Expand Down
7 changes: 2 additions & 5 deletions tracecat/runner/actions.py
Expand Up @@ -46,7 +46,6 @@
from pydantic import BaseModel, Field, validator
from tenacity import retry, stop_after_attempt, wait_exponential

from tracecat.concurrency import CloudpickleProcessPoolExecutor
from tracecat.config import HTTP_MAX_RETRIES
from tracecat.contexts import ctx_action_run, ctx_session_role, ctx_workflow_run
from tracecat.db import create_vdb_conn
Expand Down Expand Up @@ -785,11 +784,9 @@ async def run_integration_action(
params = params or {}

func = registry[qualname]
bound_func = partial(func, **params)
bound_func = partial(func, **params, __role=ctx_session_role.get())

loop = asyncio.get_running_loop()
with CloudpickleProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, bound_func)
result = await asyncio.to_thread(bound_func)

return {
"output": result,
Expand Down

0 comments on commit b6b72f6

Please sign in to comment.