Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

don't make extra calls to cloud in prefect dashboard open #16768

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/prefect/cli/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ async def open() -> None:

await run_sync_in_worker_thread(webbrowser.open_new_tab, ui_url)

if "prefect.cloud" not in ui_url:
exit_with_success(f"Opened {ui_url!r} in browser.")

async with get_cloud_client() as client:
try:
current_workspace = get_current_workspace(await client.read_workspaces())
Expand Down
95 changes: 50 additions & 45 deletions src/prefect/cli/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from uuid import UUID

import pydantic
import typer
import yaml
from pydantic import TypeAdapter
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
Expand Down Expand Up @@ -44,6 +44,7 @@
)
from prefect.cli.root import app, is_interactive
from prefect.client.base import ServerType
from prefect.client.orchestration import get_client
from prefect.client.schemas.actions import DeploymentScheduleCreate
from prefect.client.schemas.filters import WorkerFilter
from prefect.client.schemas.objects import ConcurrencyLimitConfig
Expand All @@ -52,7 +53,6 @@
IntervalSchedule,
RRuleSchedule,
)
from prefect.client.utilities import inject_client
from prefect.deployments import initialize_project
from prefect.deployments.base import (
_format_deployment_for_saving_to_prefect_file,
Expand Down Expand Up @@ -83,11 +83,17 @@
from prefect.client.orchestration import PrefectClient


DeploymentTriggerAdapter: TypeAdapter[DeploymentTriggerTypes] = TypeAdapter(
DeploymentTriggerTypes
)
SlaAdapter: TypeAdapter[SlaTypes] = TypeAdapter(SlaTypes)


@app.command()
async def init(
name: Optional[str] = None,
recipe: Optional[str] = None,
fields: Optional[List[str]] = typer.Option(
fields: Optional[list[str]] = typer.Option(
None,
"-f",
"--field",
Expand All @@ -100,7 +106,7 @@ async def init(
"""
Initialize a new deployment configuration recipe.
"""
inputs = {}
inputs: dict[str, Any] = {}
fields = fields or []
recipe_paths = prefect.__module_path__ / "deployments" / "recipes"

Expand All @@ -110,7 +116,7 @@ async def init(

if not recipe and is_interactive():
recipe_paths = prefect.__module_path__ / "deployments" / "recipes"
recipes = []
recipes: list[dict[str, Any]] = []

for r in recipe_paths.iterdir():
if r.is_dir() and (r / "prefect.yaml").exists():
Expand Down Expand Up @@ -471,14 +477,14 @@ async def deploy(
exit_with_error(str(exc))


@inject_client
async def _run_single_deploy(
deploy_config: dict[str, Any],
actions: dict[str, Any],
options: Optional[dict[str, Any]] = None,
options: dict[str, Any] | None = None,
client: Optional["PrefectClient"] = None,
prefect_file: Path = Path("prefect.yaml"),
):
client = client or get_client()
deploy_config = deepcopy(deploy_config) if deploy_config else {}
actions = deepcopy(actions) if actions else {}
options = deepcopy(options) if options else {}
Expand Down Expand Up @@ -863,9 +869,9 @@ async def _run_single_deploy(


async def _run_multi_deploy(
deploy_configs: List[Dict],
actions: Dict,
names: Optional[List[str]] = None,
deploy_configs: list[dict[str, Any]],
actions: dict[str, Any],
names: Optional[list[str]] = None,
deploy_all: bool = False,
prefect_file: Path = Path("prefect.yaml"),
):
Expand Down Expand Up @@ -902,8 +908,8 @@ async def _run_multi_deploy(


def _construct_schedules(
deploy_config: Dict,
) -> List[DeploymentScheduleCreate]:
deploy_config: dict[str, Any],
) -> list[DeploymentScheduleCreate]:
"""
Constructs a schedule from a deployment configuration.

Expand All @@ -913,6 +919,7 @@ def _construct_schedules(
Returns:
A list of schedule objects
"""
schedules: list[DeploymentScheduleCreate] = [] # Initialize with empty list
schedule_configs = deploy_config.get("schedules", NotSet) or []

if schedule_configs is not NotSet:
Expand All @@ -923,14 +930,12 @@ def _construct_schedules(
elif schedule_configs is NotSet:
if is_interactive():
schedules = prompt_schedules(app.console)
else:
schedules = []

return schedules


def _schedule_config_to_deployment_schedule(
schedule_config: Dict,
schedule_config: dict[str, Any],
) -> DeploymentScheduleCreate:
anchor_date = schedule_config.get("anchor_date")
timezone = schedule_config.get("timezone")
Expand Down Expand Up @@ -969,7 +974,7 @@ def _schedule_config_to_deployment_schedule(
)


def _merge_with_default_deploy_config(deploy_config: Dict):
def _merge_with_default_deploy_config(deploy_config: dict[str, Any]) -> dict[str, Any]:
"""
Merge a base deploy config with the default deploy config.
If a key is missing in the base deploy config, it will be filled with the
Expand All @@ -983,7 +988,7 @@ def _merge_with_default_deploy_config(deploy_config: Dict):
The merged deploy config.
"""
deploy_config = deepcopy(deploy_config)
DEFAULT_DEPLOY_CONFIG = {
DEFAULT_DEPLOY_CONFIG: dict[str, Any] = {
"name": None,
"version": None,
"tags": [],
Expand Down Expand Up @@ -1014,9 +1019,9 @@ def _merge_with_default_deploy_config(deploy_config: Dict):

async def _generate_git_clone_pull_step(
console: Console,
deploy_config: Dict,
deploy_config: dict[str, Any],
remote_url: str,
):
) -> list[dict[str, Any]]:
branch = get_git_branch() or "main"

if not remote_url:
Expand Down Expand Up @@ -1104,9 +1109,9 @@ async def _generate_git_clone_pull_step(


async def _generate_pull_step_for_build_docker_image(
console: Console, deploy_config: Dict, auto: bool = True
):
pull_step = {}
console: Console, deploy_config: dict[str, Any], auto: bool = True
) -> list[dict[str, Any]]:
pull_step: dict[str, Any] = {}
dir_name = os.path.basename(os.getcwd())
if auto:
pull_step["directory"] = f"/opt/prefect/{dir_name}"
Expand Down Expand Up @@ -1138,9 +1143,9 @@ async def _check_for_build_docker_image_step(


async def _generate_actions_for_remote_flow_storage(
console: Console, deploy_config: dict, actions: List[Dict]
) -> Dict[str, List[Dict[str, Any]]]:
storage_provider_to_collection = {
console: Console, deploy_config: dict[str, Any], actions: list[dict[str, Any]]
) -> dict[str, list[dict[str, Any]]]:
storage_provider_to_collection: dict[str, str] = {
"s3": "prefect_aws",
"gcs": "prefect_gcp",
"azure_blob_storage": "prefect_azure",
Expand Down Expand Up @@ -1195,9 +1200,9 @@ async def _generate_actions_for_remote_flow_storage(

async def _generate_default_pull_action(
console: Console,
deploy_config: Dict,
actions: List[Dict],
):
deploy_config: dict[str, Any],
actions: list[dict[str, Any]],
) -> list[dict[str, Any]]:
build_docker_image_step = await _check_for_build_docker_image_step(
deploy_config.get("build") or actions["build"]
)
Expand All @@ -1224,7 +1229,7 @@ async def _generate_default_pull_action(
entrypoint_path, _ = deploy_config["entrypoint"].split(":")
console.print(
"Your Prefect workers will attempt to load your flow from:"
f" [green]{(Path.cwd()/Path(entrypoint_path)).absolute().resolve()}[/]. To"
f" [green]{(Path.cwd() / Path(entrypoint_path)).absolute().resolve()}[/]. To"
" see more options for managing your flow's code, run:\n\n\t[blue]$"
" prefect init[/]\n"
)
Expand Down Expand Up @@ -1318,10 +1323,12 @@ def _log_missing_deployment_names(missing_names, matched_deploy_configs, names):
)


def _filter_matching_deploy_config(name, deploy_configs):
def _filter_matching_deploy_config(
name: str, deploy_configs: list[dict[str, Any]]
) -> list[dict[str, Any]]:
# Logic to find the deploy_config matching the given name
# This function handles both "flow-name/deployment-name" and just "deployment-name"
matching_deployments = []
matching_deployments: list[dict[str, Any]] = []
if "/" in name:
flow_name, deployment_name = name.split("/")
flow_name = flow_name.replace("-", "_")
Expand Down Expand Up @@ -1637,7 +1644,7 @@ def _check_for_matching_deployment_name_and_entrypoint_in_prefect_file(


def _check_if_identical_deployment_in_prefect_file(
untemplated_deploy_config: Dict, prefect_file: Path = Path("prefect.yaml")
untemplated_deploy_config: dict[str, Any], prefect_file: Path = Path("prefect.yaml")
) -> bool:
"""
Check if the given deploy config is identical to an existing deploy config in the
Expand All @@ -1662,22 +1669,20 @@ def _check_if_identical_deployment_in_prefect_file(


def _initialize_deployment_triggers(
deployment_name: str, triggers_spec: List[Dict[str, Any]]
) -> List[DeploymentTriggerTypes]:
triggers = []
deployment_name: str, triggers_spec: list[dict[str, Any]]
) -> list[DeploymentTriggerTypes]:
triggers: list[DeploymentTriggerTypes] = []
for i, spec in enumerate(triggers_spec, start=1):
spec.setdefault("name", f"{deployment_name}__automation_{i}")
triggers.append(
pydantic.TypeAdapter(DeploymentTriggerTypes).validate_python(spec)
)
triggers.append(DeploymentTriggerAdapter.validate_python(spec))

return triggers


async def _create_deployment_triggers(
client: "PrefectClient",
deployment_id: UUID,
triggers: List[Union[DeploymentTriggerTypes, TriggerTypes]],
triggers: list[DeploymentTriggerTypes | TriggerTypes],
):
try:
# The triggers defined in the deployment spec are, essentially,
Expand All @@ -1701,8 +1706,8 @@ async def _create_deployment_triggers(


def _gather_deployment_trigger_definitions(
trigger_flags: List[str], existing_triggers: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
trigger_flags: list[str], existing_triggers: list[dict[str, Any]]
) -> list[dict[str, Any]]:
"""Parses trigger flags from CLI and existing deployment config in `prefect.yaml`.

Args:
Expand All @@ -1717,7 +1722,7 @@ def _gather_deployment_trigger_definitions(
"""

if trigger_flags:
trigger_specs = []
trigger_specs: list[dict[str, Any]] = []
for t in trigger_flags:
try:
if t.endswith(".yaml"):
Expand All @@ -1735,7 +1740,7 @@ def _gather_deployment_trigger_definitions(
return existing_triggers


def _handle_deprecated_schedule_fields(deploy_config: Dict):
def _handle_deprecated_schedule_fields(deploy_config: dict[str, Any]):
deploy_config = deepcopy(deploy_config)

legacy_schedule = deploy_config.get("schedule", NotSet)
Expand Down Expand Up @@ -1779,7 +1784,7 @@ def _gather_deployment_sla_definitions(
Prefers CLI-provided SLAs over config in `prefect.yaml`.
"""
if sla_flags is not None:
sla_specs = []
sla_specs: list[dict[str, Any]] = []
for s in sla_flags:
try:
if s.endswith(".yaml"):
Expand Down Expand Up @@ -1812,7 +1817,7 @@ def _initialize_deployment_slas(
if sla_specs == [] or sla_specs == [[]]:
return []

slas = [pydantic.TypeAdapter(SlaTypes).validate_python(spec) for spec in sla_specs]
slas = [SlaAdapter.validate_python(spec) for spec in sla_specs]

for sla in slas:
sla.set_deployment_id(deployment_id)
Expand Down
28 changes: 17 additions & 11 deletions tests/cli/test_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,20 @@ def test_open_current_workspace_in_browser_failure_no_workspace_set(
)
)

respx_mock.get(PREFECT_CLOUD_API_URL.value() + "/me/workspaces").mock(
return_value=httpx.Response(
status.HTTP_200_OK,
json=[],
if "prefect.cloud" in api_url:
respx_mock.get(PREFECT_CLOUD_API_URL.value() + "/me/workspaces").mock(
return_value=httpx.Response(
status.HTTP_200_OK,
json=[],
)
)
)

with _use_profile("logged-in-profile"):
invoke_and_assert(["dashboard", "open"], expected_code=0)
invoke_and_assert(
["dashboard", "open"],
expected_code=0,
expected_output_contains=f"Opened {api_url!r} in browser.",
)


@pytest.mark.usefixtures("mock_webbrowser")
Expand All @@ -130,12 +135,13 @@ def test_open_current_workspace_in_browser_failure_unauthorized(respx_mock, api_
)
)

respx_mock.get(PREFECT_CLOUD_API_URL.value() + "/me/workspaces").mock(
return_value=httpx.Response(
status.HTTP_401_UNAUTHORIZED,
json={"detail": "Unauthorized"},
if "prefect.cloud" in api_url:
respx_mock.get(PREFECT_CLOUD_API_URL.value() + "/me/workspaces").mock(
return_value=httpx.Response(
status.HTTP_401_UNAUTHORIZED,
json={"detail": "Unauthorized"},
)
)
)

with _use_profile("logged-in-profile"):
invoke_and_assert(
Expand Down
Loading