Skip to content

Commit

Permalink
expose ui/api urls in task run runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz committed Jan 28, 2025
1 parent 776d354 commit 5d6d201
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 37 deletions.
46 changes: 37 additions & 9 deletions src/prefect/runtime/task_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,21 @@
from __future__ import annotations

import os
from typing import Any, Callable, Dict, List, Optional
from typing import Any, Callable

from prefect.context import TaskRunContext
from prefect.settings import get_current_settings

__all__ = ["id", "tags", "name", "parameters", "run_count", "task_name"]
__all__ = [
"id",
"tags",
"name",
"parameters",
"run_count",
"task_name",
"api_url",
"ui_url",
]


type_cast: dict[
Expand Down Expand Up @@ -72,17 +82,17 @@ def __getattr__(name: str) -> Any:
return real_value


def __dir__() -> List[str]:
def __dir__() -> list[str]:
return sorted(__all__)


def get_id() -> str:
def get_id() -> str | None:
task_run_ctx = TaskRunContext.get()
if task_run_ctx is not None:
return str(task_run_ctx.task_run.id)


def get_tags() -> List[str]:
def get_tags() -> list[str]:
task_run_ctx = TaskRunContext.get()
if task_run_ctx is None:
return []
Expand All @@ -98,35 +108,53 @@ def get_run_count() -> int:
return task_run_ctx.task_run.run_count


def get_name() -> Optional[str]:
def get_name() -> str | None:
task_run_ctx = TaskRunContext.get()
if task_run_ctx is None:
return None
else:
return task_run_ctx.task_run.name


def get_task_name() -> Optional[str]:
def get_task_name() -> str | None:
task_run_ctx = TaskRunContext.get()
if task_run_ctx is None:
return None
else:
return task_run_ctx.task.name


def get_parameters() -> Dict[str, Any]:
def get_parameters() -> dict[str, Any]:
task_run_ctx = TaskRunContext.get()
if task_run_ctx is not None:
return task_run_ctx.parameters
else:
return {}


FIELDS: dict[str, Callable[[], Any]] = {
def get_task_run_api_url() -> str | None:
if (api_url := get_current_settings().api.url) is None:
return None
if (task_run_id := get_id()) is None:
return None
return f"{api_url}/runs/task-run/{task_run_id}"


def get_task_run_ui_url() -> str | None:
if (ui_url := get_current_settings().ui_url) is None:
return None
if (task_run_id := get_id()) is None:
return None
return f"{ui_url}/runs/task-run/{task_run_id}"


FIELDS: dict[str, Callable[[], Any | None]] = {
"id": get_id,
"tags": get_tags,
"name": get_name,
"parameters": get_parameters,
"run_count": get_run_count,
"task_name": get_task_name,
"api_url": get_task_run_api_url,
"ui_url": get_task_run_ui_url,
}
68 changes: 44 additions & 24 deletions tests/runtime/test_flow_run.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import dataclasses
import datetime
from typing import Any

import pendulum
import pytest
Expand All @@ -20,13 +21,13 @@ async def test_access_unknown_attribute_fails(self):

async def test_import_unknown_attribute_fails(self):
with pytest.raises(ImportError, match="boop"):
from prefect.runtime.flow_run import boop # noqa
from prefect.runtime.flow_run import boop # noqa # type: ignore

async def test_known_attributes_autocomplete(self):
assert "id" in dir(flow_run)
assert "foo" not in dir(flow_run)

async def test_new_attribute_via_env_var(self, monkeypatch):
async def test_new_attribute_via_env_var(self, monkeypatch: pytest.MonkeyPatch):
monkeypatch.setenv(name="PREFECT__RUNTIME__FLOW_RUN__NEW_KEY", value="foobar")
assert flow_run.new_key == "foobar"

Expand All @@ -47,7 +48,12 @@ async def test_new_attribute_via_env_var(self, monkeypatch):
],
)
async def test_attribute_override_via_env_var(
self, monkeypatch, attribute_name, attribute_value, env_value, expected_value
self,
monkeypatch: pytest.MonkeyPatch,
attribute_name: str,
attribute_value: Any,
env_value: str,
expected_value: Any,
):
# mock attribute_name to be a function that generates attribute_value
monkeypatch.setitem(flow_run.FIELDS, attribute_name, lambda: attribute_value)
Expand All @@ -71,7 +77,7 @@ async def test_attribute_override_via_env_var(
],
)
async def test_attribute_override_via_env_var_not_allowed(
self, monkeypatch, attribute_name, attribute_value
self, monkeypatch: pytest.MonkeyPatch, attribute_name: str, attribute_value: Any
):
# mock attribute_name to be a function that generates attribute_value
monkeypatch.setitem(flow_run.FIELDS, attribute_name, lambda: attribute_value)
Expand All @@ -96,12 +102,12 @@ async def test_id_is_attribute(self):
async def test_id_is_none_when_not_set(self):
assert flow_run.id is None

async def test_id_uses_env_var_when_set(self, monkeypatch):
async def test_id_uses_env_var_when_set(self, monkeypatch: pytest.MonkeyPatch):
monkeypatch.setenv(name="PREFECT__FLOW_RUN_ID", value="foo")
assert flow_run.id == "foo"

async def test_id_prioritizes_context_info_over_env_var_dynamically(
self, monkeypatch
self, monkeypatch: pytest.MonkeyPatch
):
monkeypatch.setenv(name="PREFECT__FLOW_RUN_ID", value="foo")

Expand Down Expand Up @@ -142,7 +148,9 @@ def run_with_tags():

assert flow_run.tags == []

async def test_tags_pulls_from_api_when_needed(self, monkeypatch, prefect_client):
async def test_tags_pulls_from_api_when_needed(
self, monkeypatch: pytest.MonkeyPatch, prefect_client: PrefectClient
):
run = await prefect_client.create_flow_run(
flow=flow(lambda: None, name="test"), tags=["red", "green"]
)
Expand Down Expand Up @@ -170,7 +178,9 @@ async def test_run_count_returns_run_count_when_present_dynamically(self):

assert flow_run.run_count == 0

async def test_run_count_from_api(self, monkeypatch, prefect_client):
async def test_run_count_from_api(
self, monkeypatch: pytest.MonkeyPatch, prefect_client: PrefectClient
):
run = await prefect_client.create_flow_run(
flow=flow(lambda: None, name="test", retries=5)
)
Expand All @@ -193,7 +203,7 @@ async def test_scheduled_start_time_is_timestamp_when_not_set(self):
assert isinstance(flow_run.scheduled_start_time, datetime.datetime)

async def test_scheduled_start_time_pulls_from_api_when_needed(
self, monkeypatch, prefect_client
self, monkeypatch: pytest.MonkeyPatch, prefect_client: PrefectClient
):
TIMESTAMP = pendulum.now("utc").add(days=7)
run = await prefect_client.create_flow_run(
Expand Down Expand Up @@ -224,7 +234,9 @@ async def test_name_returns_name_when_present_dynamically(self):

assert flow_run.name is None

async def test_name_pulls_from_api_when_needed(self, monkeypatch, prefect_client):
async def test_name_pulls_from_api_when_needed(
self, monkeypatch: pytest.MonkeyPatch, prefect_client: PrefectClient
):
run = await prefect_client.create_flow_run(
flow=flow(lambda: None, name="test"), name="foo"
)
Expand Down Expand Up @@ -253,7 +265,7 @@ async def test_flow_name_returns_flow_name_when_present_dynamically(self):
assert flow_run.flow_name is None

async def test_flow_name_pulls_from_api_when_needed(
self, monkeypatch, prefect_client
self, monkeypatch: pytest.MonkeyPatch, prefect_client: PrefectClient
):
run = await prefect_client.create_flow_run(
flow=flow(lambda: None, name="foo"), name="bar"
Expand All @@ -279,7 +291,9 @@ async def test_parameters_from_context(self):
):
assert flow_run.parameters == {"x": "foo", "y": "bar"}

async def test_parameters_from_api(self, monkeypatch, prefect_client):
async def test_parameters_from_api(
self, monkeypatch: pytest.MonkeyPatch, prefect_client: PrefectClient
):
run = await prefect_client.create_flow_run(
flow=flow(lambda: None, name="foo"), parameters={"x": "foo", "y": "bar"}
)
Expand All @@ -300,7 +314,9 @@ def my_flow(x):

assert my_flow(foo) == {"x": foo}

async def test_outside_flow_run_uses_serialized_parameters(self, monkeypatch):
async def test_outside_flow_run_uses_serialized_parameters(
self, monkeypatch: pytest.MonkeyPatch
):
@dataclasses.dataclass
class Foo:
y: int
Expand All @@ -325,7 +341,7 @@ async def test_parent_flow_run_id_is_empty_when_not_set(self):
assert flow_run.parent_flow_run_id is None

async def test_parent_flow_run_id_returns_parent_flow_run_id_when_present_dynamically(
self, prefect_client
self, prefect_client: PrefectClient
):
assert flow_run.parent_flow_run_id is None

Expand Down Expand Up @@ -360,7 +376,7 @@ def foo():
assert flow_run.parent_flow_run_id is None

async def test_parent_flow_run_id_pulls_from_api_when_needed(
self, monkeypatch, prefect_client
self, monkeypatch: pytest.MonkeyPatch, prefect_client: PrefectClient
):
assert flow_run.parent_flow_run_id is None

Expand Down Expand Up @@ -402,7 +418,7 @@ async def test_parent_deployment_id_is_empty_when_not_set(self):
assert flow_run.parent_deployment_id is None

async def test_parent_deployment_id_returns_parent_deployment_id_when_present_dynamically(
self, prefect_client
self, prefect_client: PrefectClient
):
assert flow_run.parent_deployment_id is None

Expand Down Expand Up @@ -460,7 +476,7 @@ def foo():
assert flow_run.parent_deployment_id is None

async def test_parent_deployment_id_pulls_from_api_when_needed(
self, monkeypatch, prefect_client: PrefectClient
self, monkeypatch: pytest.MonkeyPatch, prefect_client: PrefectClient
):
assert flow_run.parent_deployment_id is None

Expand Down Expand Up @@ -536,7 +552,7 @@ async def test_root_flow_run_id_is_empty_when_not_set(self):
assert flow_run.root_flow_run_id is None

async def test_root_flow_run_id_pulls_from_api_when_needed(
self, monkeypatch, prefect_client
self, monkeypatch: pytest.MonkeyPatch, prefect_client: PrefectClient
):
assert flow_run.root_flow_run_id is None

Expand Down Expand Up @@ -591,11 +607,11 @@ def child_task():

class TestURL:
@pytest.mark.parametrize("url_type", ["api_url", "ui_url"])
async def test_url_is_attribute(self, url_type):
async def test_url_is_attribute(self, url_type: str):
assert url_type in dir(flow_run)

@pytest.mark.parametrize("url_type", ["api_url", "ui_url"])
async def test_url_is_none_when_id_not_set(self, url_type):
async def test_url_is_none_when_id_not_set(self, url_type: str):
assert getattr(flow_run, url_type) is None

@pytest.mark.parametrize(
Expand All @@ -604,13 +620,15 @@ async def test_url_is_none_when_id_not_set(self, url_type):
)
async def test_url_returns_correct_url_when_id_present(
self,
url_type,
url_type: str,
):
test_id = "12345"
if url_type == "api_url":
base_url_value = PREFECT_API_URL.value()
elif url_type == "ui_url":
base_url_value = PREFECT_UI_URL.value()
else:
raise ValueError(f"Invalid url_type: {url_type}")

expected_url = f"{base_url_value}/flow-runs/flow-run/{test_id}"

Expand All @@ -627,9 +645,9 @@ async def test_url_returns_correct_url_when_id_present(
)
async def test_url_pulls_from_api_when_needed(
self,
monkeypatch,
prefect_client,
url_type,
monkeypatch: pytest.MonkeyPatch,
prefect_client: PrefectClient,
url_type: str,
):
run = await prefect_client.create_flow_run(flow=flow(lambda: None, name="test"))

Expand All @@ -639,6 +657,8 @@ async def test_url_pulls_from_api_when_needed(
base_url_value = PREFECT_API_URL.value()
elif url_type == "ui_url":
base_url_value = PREFECT_UI_URL.value()
else:
raise ValueError(f"Invalid url_type: {url_type}")

expected_url = f"{base_url_value}/flow-runs/flow-run/{str(run.id)}"

Expand Down
Loading

0 comments on commit 5d6d201

Please sign in to comment.