Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add root flow run id to runtime #13165

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
90 changes: 90 additions & 0 deletions .github/workflows/prefect-client.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
name: Verify prefect-client build

on:
pull_request:
branches:
- main
paths:
- src/prefect/**/*.py
- requirements.txt
- requirements-client.txt
- setup.cfg
push:
branches:
- main
paths:
- src/prefect/**/*.py
- requirements.txt
- requirements-client.txt
- setup.cfg
workflow_call:
inputs:
upload-artifacts:
description: "Whether or not to upload artifacts created in this workflow"
default: false
type: boolean
artifact-name:
description: "The name for the build prefect-client artifact"
default: "prefect-client-pypi-dists"
type: string

jobs:
prefect-client-smoke-test:
name: Build and run prefect-client
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
# Versioneer only generates correct versions with a full fetch
fetch-depth: 0
persist-credentials: false

- name: Set up Python 3.8
uses: actions/setup-python@v5
with:
python-version: "3.8"
cache: "pip"
cache-dependency-path: "requirements-client.txt"

- name: Create a temp dir to stage our build
run: echo "TMPDIR=$(mktemp -d)" >> $GITHUB_ENV

- name: Prepare files for prefect-client build (omit the local build)
run: sh client/build_client.sh
env:
TMPDIR: ${{ env.TMPDIR }}

- name: Build a binary wheel and a source tarball
run: pip install wheel && python setup.py sdist bdist_wheel
working-directory: ${{ env.TMPDIR }}

- name: Install the built client from the locally built package
run: pip install dist/*.tar.gz
working-directory: ${{ env.TMPDIR }}

- name: Run the smoke test flow using the built client
run: python client/client_flow.py
working-directory: ${{ env.TMPDIR }}
env:
PREFECT_API_KEY: ${{ secrets.PREFECT_CLIENT_SA_API_KEY }}
PREFECT_API_URL: "https://api.prefect.cloud/api/accounts/9b649228-0419-40e1-9e0d-44954b5c0ab6/workspaces/96bd3cf8-85c9-4545-9713-b4e3c3e03466" # sandbox, prefect-client workspace

- name: Install prefect from source
run: pip install .

- name: (DEBUG) Check that prefect and prefect-client are installed
run: pip list | grep prefect

- name: Run the smoke test flow again with prefect and prefect-client installed
run: python client/client_flow.py
working-directory: ${{ env.TMPDIR }}
env:
PREFECT_API_KEY: ${{ secrets.PREFECT_CLIENT_SA_API_KEY }}
PREFECT_API_URL: "https://api.prefect.cloud/api/accounts/9b649228-0419-40e1-9e0d-44954b5c0ab6/workspaces/96bd3cf8-85c9-4545-9713-b4e3c3e03466" # sandbox, prefect-client workspace

- name: Publish build artifacts
if: ${{ inputs.upload-artifacts }}
uses: actions/upload-artifact@v4
with:
name: ${{ inputs.artifact-name }}
path: "${{ env.TMPDIR }}/dist"
43 changes: 40 additions & 3 deletions src/prefect/runtime/flow_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
- `parent_deployment_id`: the ID of the deployment that triggered this run, if any
- `run_count`: the number of times this flow run has been run
"""

import os
from typing import Any, Dict, List, Optional
from uuid import UUID

import pendulum

Expand All @@ -37,6 +39,7 @@
"parameters",
"parent_flow_run_id",
"parent_deployment_id",
"root_flow_run_id",
"run_count",
"api_url",
"ui_url",
Expand Down Expand Up @@ -236,19 +239,52 @@ def get_parent_flow_run_id() -> Optional[str]:
parent_task_run = from_sync.call_soon_in_loop_thread(
create_call(_get_task_run, parent_task_run_id)
).result()
return parent_task_run.flow_run_id
return str(parent_task_run.flow_run_id) if parent_task_run.flow_run_id else None

return None


def get_parent_deployment_id() -> Dict[str, Any]:
def get_parent_deployment_id() -> Optional[UUID]:
parent_flow_run_id = get_parent_flow_run_id()
if parent_flow_run_id is None:
return None

parent_flow_run = from_sync.call_soon_in_loop_thread(
create_call(_get_flow_run, parent_flow_run_id)
).result()
return parent_flow_run.deployment_id if parent_flow_run else None

if parent_flow_run:
return (
str(parent_flow_run.deployment_id)
if parent_flow_run.deployment_id
else None
)

return None


def get_root_flow_run_id() -> str:
run_id = get_id()
parent_flow_run_id = get_parent_flow_run_id()
if parent_flow_run_id is None:
return run_id

def _get_root_flow_run_id(flow_run_id):
flow_run = from_sync.call_soon_in_loop_thread(
create_call(_get_flow_run, flow_run_id)
).result()

if flow_run.parent_task_run_id is None:
return str(flow_run_id)
else:
parent_task_run = from_sync.call_soon_in_loop_thread(
create_call(_get_task_run, flow_run.parent_task_run_id)
).result()
return _get_root_flow_run_id(parent_task_run.flow_run_id)

root_flow_run_id = _get_root_flow_run_id(parent_flow_run_id)

return root_flow_run_id


def get_flow_run_api_url() -> Optional[str]:
Expand All @@ -274,6 +310,7 @@ def get_flow_run_ui_url() -> Optional[str]:
"parameters": get_parameters,
"parent_flow_run_id": get_parent_flow_run_id,
"parent_deployment_id": get_parent_deployment_id,
"root_flow_run_id": get_root_flow_run_id,
"run_count": get_run_count,
"api_url": get_flow_run_api_url,
"ui_url": get_flow_run_ui_url,
Expand Down
73 changes: 67 additions & 6 deletions tests/runtime/test_flow_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,8 @@ def foo():
):
assert (
flow_run.parent_flow_run_id
== parent_flow_run.id
== parent_task_run.flow_run_id
== str(parent_flow_run.id)
== str(parent_task_run.flow_run_id)
)

assert flow_run.parent_flow_run_id is None
Expand Down Expand Up @@ -380,8 +380,8 @@ def foo():
monkeypatch.setenv(name="PREFECT__FLOW_RUN_ID", value=str(child_flow_run.id))
assert (
flow_run.parent_flow_run_id
== parent_flow_run.id
== parent_task_run.flow_run_id
== str(parent_flow_run.id)
== str(parent_task_run.flow_run_id)
)

monkeypatch.setenv(name="PREFECT__FLOW_RUN_ID", value=str(parent_flow_run.id))
Expand Down Expand Up @@ -444,7 +444,7 @@ def foo():
),
flow=Flow(fn=lambda: None, name="child-flow-with-parent-deployment"),
):
assert flow_run.parent_deployment_id == parent_flow_deployment_id
assert flow_run.parent_deployment_id == str(parent_flow_deployment_id)

# No parent flow run
with FlowRunContext.construct(
Expand Down Expand Up @@ -513,7 +513,7 @@ def foo():
monkeypatch.setenv(
name="PREFECT__FLOW_RUN_ID", value=str(child_flow_run_with_deployment.id)
)
assert flow_run.parent_deployment_id == parent_flow_deployment_id
assert flow_run.parent_deployment_id == str(parent_flow_deployment_id)

# No parent flow run
monkeypatch.setenv(
Expand All @@ -522,6 +522,67 @@ def foo():
assert flow_run.parent_deployment_id is None


class TestRootFlowRunId:
async def test_root_flow_run_id_is_attribute(self):
assert "root_flow_run_id" in dir(flow_run)

async def test_root_flow_run_id_is_empty_when_not_set(self):
assert flow_run.root_flow_run_id is None

async def test_root_flow_run_id_pulls_from_api_when_needed(
self, monkeypatch, prefect_client
):
assert flow_run.root_flow_run_id is None

root_flow_run = await prefect_client.create_flow_run(
flow=Flow(fn=lambda: None, name="root"),
parameters={"x": "foo", "y": "bar"},
parent_task_run_id=None,
)

@task
def root_task():
return 1

root_task_run = await prefect_client.create_task_run(
task=root_task,
dynamic_key="1",
flow_run_id=root_flow_run.id,
)

child_flow_run = await prefect_client.create_flow_run(
flow=Flow(fn=lambda: None, name="child"),
parameters={"x": "foo", "y": "bar"},
parent_task_run_id=root_task_run.id,
)

@task
def child_task():
return 1

child_task_run = await prefect_client.create_task_run(
task=child_task,
dynamic_key="1",
flow_run_id=child_flow_run.id,
)

deep_flow_run = await prefect_client.create_flow_run(
flow=Flow(fn=lambda: None, name="deep"),
parameters={"x": "foo", "y": "bar"},
parent_task_run_id=child_task_run.id,
)

monkeypatch.setenv(name="PREFECT__FLOW_RUN_ID", value=str(deep_flow_run.id))
assert (
flow_run.root_flow_run_id
== str(root_flow_run.id)
== str(root_task_run.flow_run_id)
)

monkeypatch.setenv(name="PREFECT__FLOW_RUN_ID", value=str(root_flow_run.id))
assert flow_run.root_flow_run_id == str(root_flow_run.id)


class TestURL:
@pytest.mark.parametrize("url_type", ["api_url", "ui_url"])
async def test_url_is_attribute(self, url_type):
Expand Down