diff --git a/src/prefect/_experimental/sla/__init__.py b/src/prefect/_experimental/sla/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/src/prefect/_experimental/sla/client.py b/src/prefect/_experimental/sla/client.py new file mode 100644 index 000000000000..98e3d5246127 --- /dev/null +++ b/src/prefect/_experimental/sla/client.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from prefect.client.orchestration.base import BaseAsyncClient, BaseClient + +if TYPE_CHECKING: + from uuid import UUID + + from prefect._experimental.sla.objects import SlaTypes + + +class SlaClient(BaseClient): + def create_sla(self, sla: "SlaTypes") -> "UUID": + """ + Creates a service level agreement. + Args: + sla: The SLA to create. Must have a deployment ID set. + Raises: + httpx.RequestError: if the SLA was not created for any reason + Returns: + the ID of the SLA in the backend + """ + if not sla.owner_resource: + raise ValueError( + "Deployment ID is not set. Please set using `set_deployment_id`." + ) + + response = self.request( + "POST", + "/slas/", + json=sla.model_dump(mode="json", exclude_unset=True), + ) + response.raise_for_status() + + from uuid import UUID + + return UUID(response.json().get("id")) + + +class SlaAsyncClient(BaseAsyncClient): + async def create_sla(self, sla: "SlaTypes") -> "UUID": + """ + Creates a service level agreement. + Args: + sla: The SLA to create. Must have a deployment ID set. + Raises: + httpx.RequestError: if the SLA was not created for any reason + Returns: + the ID of the SLA in the backend + """ + if not sla.owner_resource: + raise ValueError( + "Deployment ID is not set. Please set using `set_deployment_id`." + ) + + response = await self.request( + "POST", + "/slas/", + json=sla.model_dump(mode="json", exclude_unset=True), + ) + response.raise_for_status() + + from uuid import UUID + + return UUID(response.json().get("id")) diff --git a/src/prefect/_experimental/sla/objects.py b/src/prefect/_experimental/sla/objects.py new file mode 100644 index 000000000000..a4e30e618cbd --- /dev/null +++ b/src/prefect/_experimental/sla/objects.py @@ -0,0 +1,53 @@ +from __future__ import annotations + +import abc +from typing import Literal, Optional, Union +from uuid import UUID + +from pydantic import Field, PrivateAttr, computed_field +from typing_extensions import TypeAlias + +from prefect._internal.schemas.bases import PrefectBaseModel + + +class ServiceLevelAgreement(PrefectBaseModel, abc.ABC): + """An ORM representation of a Service Level Agreement.""" + + _deployment_id: Optional[UUID] = PrivateAttr(default=None) + + name: str = Field( + default=..., + description="The name of the SLA. Names must be unique on a per-deployment basis.", + ) + severity: Literal["minor", "low", "moderate", "high", "critical"] = Field( + default="moderate", + description="The severity of the SLA.", + ) + enabled: Optional[bool] = Field( + default=True, + description="Whether the SLA is enabled.", + ) + + def set_deployment_id(self, deployment_id: UUID): + self._deployment_id = deployment_id + return self + + @computed_field + @property + def owner_resource(self) -> Union[str, None]: + if self._deployment_id: + return f"prefect.deployment.{self._deployment_id}" + return None + + +class TimeToCompletionSla(ServiceLevelAgreement): + """An SLA that triggers when a flow run takes longer than the specified duration.""" + + duration: int = Field( + default=..., + description="The maximum flow run duration allowed before the SLA is violated, expressed in seconds.", + ) + + +# Concrete SLA types +SlaTypes: TypeAlias = Union[TimeToCompletionSla] diff --git a/src/prefect/cli/deploy.py b/src/prefect/cli/deploy.py index cf75f7cf00cd..34ca06e07179 100644 --- a/src/prefect/cli/deploy.py +++ b/src/prefect/cli/deploy.py @@ -1,5 +1,7 @@ """Module containing implementation for deploying flows.""" +from __future__ import annotations + import json import os import re @@ -19,6 +21,7 @@ from yaml.error import YAMLError import prefect +from prefect._experimental.sla.objects import SlaTypes from prefect._internal.compatibility.deprecated import ( generate_deprecation_message, ) @@ -40,6 +43,7 @@ exit_with_error, ) from prefect.cli.root import app, is_interactive +from prefect.client.base import ServerType from prefect.client.schemas.actions import DeploymentScheduleCreate from prefect.client.schemas.filters import WorkerFilter from prefect.client.schemas.objects import ConcurrencyLimitConfig @@ -350,6 +354,12 @@ async def deploy( "--prefect-file", help="Specify a custom path to a prefect.yaml file", ), + sla: List[str] = typer.Option( + None, + "--sla", + help="Experimental: One or more SLA configurations for the deployment. May be" + " removed or modified at any time. Currently only supported on Prefect Cloud.", + ), ): """ Create a deployment to deploy a flow from this project. @@ -405,6 +415,7 @@ async def deploy( "triggers": trigger, "param": param, "params": params, + "sla": sla, } try: deploy_configs, actions = _load_deploy_configs_and_actions( @@ -734,6 +745,14 @@ async def _run_single_deploy( await _create_deployment_triggers(client, deployment_id, triggers) + if sla_specs := _gather_deployment_sla_definitions( + options.get("sla"), deploy_config.get("sla") + ): + slas = _initialize_deployment_slas(deployment_id, sla_specs) + await _create_slas(client, slas) + else: + slas = [] + app.console.print( Panel( f"Deployment '{deploy_config['flow_name']}/{deploy_config['name']}'" @@ -791,6 +810,7 @@ async def _run_single_deploy( push_steps=push_steps or None, pull_steps=pull_steps or None, triggers=trigger_specs or None, + sla=sla_specs or None, prefect_file=prefect_file, ) app.console.print( @@ -1737,3 +1757,71 @@ def _handle_deprecated_schedule_fields(deploy_config: Dict): ) return deploy_config + + +def _gather_deployment_sla_definitions( + sla_flags: Union[list[str], None], existing_slas: Union[list[dict[str, Any]], None] +) -> Union[list[dict[str, Any]], None]: + """Parses SLA flags from CLI and existing deployment config in `prefect.yaml`. + Prefers CLI-provided SLAs over config in `prefect.yaml`. + """ + if sla_flags: + sla_specs = [] + for s in sla_flags: + try: + if s.endswith(".yaml"): + with open(s, "r") as f: + sla_specs.extend(yaml.safe_load(f).get("sla", [])) + elif s.endswith(".json"): + with open(s, "r") as f: + sla_specs.extend(json.load(f).get("sla", [])) + else: + sla_specs.append(json.loads(s)) + except Exception as e: + raise ValueError(f"Failed to parse SLA: {s}. Error: {str(e)}") + return sla_specs + + return existing_slas + + +def _initialize_deployment_slas( + deployment_id: UUID, sla_specs: list[dict[str, Any]] +) -> list[SlaTypes]: + """Initializes SLAs for a deployment. + + Args: + deployment_id: Deployment ID. + sla_specs: SLA specification dictionaries. + + Returns: + List of SLAs. + """ + slas = [pydantic.TypeAdapter(SlaTypes).validate_python(spec) for spec in sla_specs] + + for sla in slas: + sla.set_deployment_id(deployment_id) + + return slas + + +async def _create_slas( + client: "PrefectClient", + slas: List[SlaTypes], +): + if client.server_type == ServerType.CLOUD: + exceptions = [] + for sla in slas: + try: + await client.create_sla(sla) + except Exception as e: + app.console.print( + f"""Failed to create SLA: {sla.get("name")}. Error: {str(e)}""", + style="red", + ) + exceptions.append((f"""Failed to create SLA: {sla.get('name')}""", e)) + if exceptions: + raise ValueError("Failed to create one or more SLAs.", exceptions) + else: + raise ValueError( + "SLA configuration is currently only supported on Prefect Cloud." + ) diff --git a/src/prefect/client/orchestration/__init__.py b/src/prefect/client/orchestration/__init__.py index 8ccd288ec738..990d38fff1c3 100644 --- a/src/prefect/client/orchestration/__init__.py +++ b/src/prefect/client/orchestration/__init__.py @@ -48,6 +48,7 @@ AutomationClient, AutomationAsyncClient, ) +from prefect._experimental.sla.client import SlaClient, SlaAsyncClient from prefect.client.orchestration._flows.client import ( FlowClient, @@ -249,6 +250,7 @@ class PrefectClient( ConcurrencyLimitAsyncClient, DeploymentAsyncClient, AutomationAsyncClient, + SlaAsyncClient, FlowRunAsyncClient, FlowAsyncClient, ): @@ -1863,6 +1865,7 @@ class SyncPrefectClient( ConcurrencyLimitClient, DeploymentClient, AutomationClient, + SlaClient, FlowRunClient, FlowClient, ): diff --git a/src/prefect/deployments/base.py b/src/prefect/deployments/base.py index ec2b33cb1632..9ec16a909fff 100644 --- a/src/prefect/deployments/base.py +++ b/src/prefect/deployments/base.py @@ -5,6 +5,8 @@ To get started, follow along with [the deloyments tutorial](/tutorials/deployments/). """ +from __future__ import annotations + import os from copy import deepcopy from pathlib import Path @@ -275,6 +277,7 @@ def _save_deployment_to_prefect_file( push_steps: Optional[List[Dict]] = None, pull_steps: Optional[List[Dict]] = None, triggers: Optional[List[Dict]] = None, + sla: Optional[list[dict]] = None, prefect_file: Path = Path("prefect.yaml"), ): """ @@ -319,6 +322,9 @@ def _save_deployment_to_prefect_file( if triggers and triggers != parsed_prefect_file_contents.get("triggers"): deployment["triggers"] = triggers + if sla and sla != parsed_prefect_file_contents.get("sla"): + deployment["sla"] = sla + deployments = parsed_prefect_file_contents.get("deployments") if deployments is None: parsed_prefect_file_contents["deployments"] = [deployment] diff --git a/src/prefect/deployments/runner.py b/src/prefect/deployments/runner.py index 60292f7f6638..2ce0f4069d6f 100644 --- a/src/prefect/deployments/runner.py +++ b/src/prefect/deployments/runner.py @@ -36,6 +36,7 @@ def fast_flow(): from typing import TYPE_CHECKING, Any, ClassVar, Iterable, List, Optional, Union from uuid import UUID +from exceptiongroup import ExceptionGroup # novermin from pydantic import ( BaseModel, ConfigDict, @@ -48,12 +49,14 @@ def fast_flow(): from rich.progress import Progress, SpinnerColumn, TextColumn, track from rich.table import Table +from prefect._experimental.sla.objects import SlaTypes from prefect._internal.concurrency.api import create_call, from_async from prefect._internal.schemas.validators import ( reconcile_paused_deployment, reconcile_schedules_runner, ) -from prefect.client.orchestration import get_client +from prefect.client.base import ServerType +from prefect.client.orchestration import PrefectClient, get_client from prefect.client.schemas.actions import DeploymentScheduleCreate from prefect.client.schemas.filters import WorkerFilter, WorkerFilterStatus from prefect.client.schemas.objects import ( @@ -128,6 +131,7 @@ class RunnerDeployment(BaseModel): job_variables: Settings used to override the values specified default base job template of the chosen work pool. Refer to the base job template of the chosen work pool for available settings. + _sla: (Experimental) SLA configuration for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud. """ model_config: ClassVar[ConfigDict] = ConfigDict(arbitrary_types_allowed=True) @@ -207,6 +211,10 @@ class RunnerDeployment(BaseModel): " a built runner." ), ) + # (Experimental) SLA configuration for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud. + _sla: Optional[Union[SlaTypes, list[SlaTypes]]] = PrivateAttr( + default=None, + ) _entrypoint_type: EntrypointType = PrivateAttr( default=EntrypointType.FILE_PATH, ) @@ -351,8 +359,32 @@ async def apply( trigger.set_deployment_id(deployment_id) await client.create_automation(trigger.as_automation()) + # We plan to support SLA configuration on the Prefect Server in the future. + # For now, we only support it on Prefect Cloud. + if self._sla: + await self._create_slas(deployment_id, client) + return deployment_id + async def _create_slas(self, deployment_id: UUID, client: PrefectClient): + if not isinstance(self._sla, list): + self._sla = [self._sla] + + if client.server_type == ServerType.CLOUD: + exceptions = [] + for sla in self._sla: + try: + sla.set_deployment_id(deployment_id) + await client.create_sla(sla) + except Exception as e: + exceptions.append(e) + if exceptions: + raise ExceptionGroup("Failed to create SLAs", exceptions) # novermin + else: + raise ValueError( + "SLA configuration is currently only supported on Prefect Cloud." + ) + @staticmethod def _construct_deployment_schedules( interval: Optional[ @@ -467,6 +499,7 @@ def from_flow( work_queue_name: Optional[str] = None, job_variables: Optional[dict[str, Any]] = None, entrypoint_type: EntrypointType = EntrypointType.FILE_PATH, + _sla: Optional[Union[SlaTypes, list[SlaTypes]]] = None, # experimental ) -> "RunnerDeployment": """ Configure a deployment for a given flow. @@ -497,6 +530,7 @@ def from_flow( job_variables: Settings used to override the values specified default base job template of the chosen work pool. Refer to the base job template of the chosen work pool for available settings. + _sla: (Experimental) SLA configuration for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud. """ constructed_schedules = cls._construct_deployment_schedules( interval=interval, @@ -532,6 +566,7 @@ def from_flow( work_queue_name=work_queue_name, job_variables=job_variables, ) + deployment._sla = _sla if not deployment.entrypoint: no_file_location_error = ( @@ -607,6 +642,7 @@ def from_entrypoint( work_pool_name: Optional[str] = None, work_queue_name: Optional[str] = None, job_variables: Optional[dict[str, Any]] = None, + _sla: Optional[Union[SlaTypes, list[SlaTypes]]] = None, # experimental ) -> "RunnerDeployment": """ Configure a deployment for a given flow located at a given entrypoint. @@ -638,6 +674,7 @@ def from_entrypoint( job_variables: Settings used to override the values specified default base job template of the chosen work pool. Refer to the base job template of the chosen work pool for available settings. + _sla: (Experimental) SLA configuration for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud. """ from prefect.flows import load_flow_from_entrypoint @@ -677,6 +714,7 @@ def from_entrypoint( work_queue_name=work_queue_name, job_variables=job_variables, ) + deployment._sla = _sla deployment._path = str(Path.cwd()) cls._set_defaults_from_flow(deployment, flow) @@ -708,6 +746,7 @@ async def from_storage( work_pool_name: Optional[str] = None, work_queue_name: Optional[str] = None, job_variables: Optional[dict[str, Any]] = None, + _sla: Optional[Union[SlaTypes, list[SlaTypes]]] = None, # experimental ): """ Create a RunnerDeployment from a flow located at a given entrypoint and stored in a @@ -739,6 +778,7 @@ async def from_storage( job_variables: Settings used to override the values specified default base job template of the chosen work pool. Refer to the base job template of the chosen work pool for available settings. + _sla: (Experimental) SLA configuration for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud. """ from prefect.flows import load_flow_from_entrypoint @@ -787,6 +827,7 @@ async def from_storage( work_queue_name=work_queue_name, job_variables=job_variables, ) + deployment._sla = _sla deployment._path = str(storage.destination).replace( tmpdir, "$STORAGE_BASE_PATH" ) diff --git a/src/prefect/flows.py b/src/prefect/flows.py index cbd763952c2f..279af4d2bdae 100644 --- a/src/prefect/flows.py +++ b/src/prefect/flows.py @@ -2,6 +2,8 @@ Module containing the base workflow class and decorator - for most use cases, using the [`@flow` decorator][prefect.flows.flow] is preferred. """ +from __future__ import annotations + # This file requires type-checking with pyright because mypy does not yet support PEP612 # See https://github.com/python/mypy/issues/8645 import ast @@ -43,6 +45,7 @@ from rich.console import Console from typing_extensions import Literal, ParamSpec, TypeAlias +from prefect._experimental.sla.objects import SlaTypes from prefect._internal.concurrency.api import create_call, from_async from prefect.blocks.core import Block from prefect.client.schemas.actions import DeploymentScheduleCreate @@ -651,6 +654,7 @@ async def to_deployment( work_queue_name: Optional[str] = None, job_variables: Optional[dict[str, Any]] = None, entrypoint_type: EntrypointType = EntrypointType.FILE_PATH, + _sla: Optional[Union[SlaTypes, list[SlaTypes]]] = None, # experimental ) -> "RunnerDeployment": """ Creates a runner deployment object for this flow. @@ -681,6 +685,7 @@ async def to_deployment( of the chosen work pool. Refer to the base job template of the chosen work pool for entrypoint_type: Type of entrypoint to use for the deployment. When using a module path entrypoint, ensure that the module will be importable in the execution environment. + _sla: (Experimental) SLA configuration for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud. Examples: Prepare two deployments and serve them: @@ -728,6 +733,7 @@ def my_other_flow(name): work_pool_name=work_pool_name, work_queue_name=work_queue_name, job_variables=job_variables, + _sla=_sla, ) # type: ignore # TODO: remove sync_compatible else: return RunnerDeployment.from_flow( @@ -749,6 +755,7 @@ def my_other_flow(name): work_queue_name=work_queue_name, job_variables=job_variables, entrypoint_type=entrypoint_type, + _sla=_sla, ) def on_completion(self, fn: StateHookCallable) -> StateHookCallable: @@ -1061,6 +1068,7 @@ async def deploy( entrypoint_type: EntrypointType = EntrypointType.FILE_PATH, print_next_steps: bool = True, ignore_warnings: bool = False, + _sla: Optional[Union[SlaTypes, list[SlaTypes]]] = None, ) -> UUID: """ Deploys a flow to run on dynamic infrastructure via a work pool. @@ -1112,7 +1120,7 @@ async def deploy( print_next_steps_message: Whether or not to print a message with next steps after deploying the deployments. ignore_warnings: Whether or not to ignore warnings about the work pool type. - + _sla: (Experimental) SLA configuration for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud. Returns: The ID of the created/updated deployment. @@ -1190,6 +1198,7 @@ def my_flow(name): work_queue_name=work_queue_name, job_variables=job_variables, entrypoint_type=entrypoint_type, + _sla=_sla, ) from prefect.deployments.runner import deploy diff --git a/tests/experimental/test_sla.py b/tests/experimental/test_sla.py new file mode 100644 index 000000000000..dc5178bf583c --- /dev/null +++ b/tests/experimental/test_sla.py @@ -0,0 +1,765 @@ +import json +import shutil +import sys +from datetime import timedelta +from pathlib import Path +from time import sleep +from unittest import mock +from uuid import UUID, uuid4 + +import httpx +import pytest +import readchar +import respx +import yaml +from exceptiongroup import ExceptionGroup # novermin +from typer import Exit + +import prefect +from prefect import flow +from prefect._experimental.sla.objects import ( + ServiceLevelAgreement, + TimeToCompletionSla, +) +from prefect.cli.deploy import ( + _create_slas, + _initialize_deployment_slas, +) +from prefect.client.base import ServerType +from prefect.client.orchestration import PrefectClient, get_client +from prefect.client.schemas.actions import WorkPoolCreate +from prefect.client.schemas.objects import WorkPool +from prefect.deployments.base import initialize_project +from prefect.deployments.runner import RunnerDeployment +from prefect.settings import ( + PREFECT_API_URL, + temporary_settings, +) +from prefect.testing.cli import invoke_and_assert +from prefect.utilities.asyncutils import run_sync_in_worker_thread +from prefect.utilities.filesystem import tmpchdir + +TEST_PROJECTS_DIR = prefect.__development_base_path__ / "tests" / "test-projects" + + +@pytest.fixture +def project_dir(tmp_path): + with tmpchdir(tmp_path): + shutil.copytree(TEST_PROJECTS_DIR, tmp_path, dirs_exist_ok=True) + prefect_home = tmp_path / ".prefect" + prefect_home.mkdir(exist_ok=True, mode=0o0700) + initialize_project() + yield tmp_path + + +@pytest.fixture +async def docker_work_pool(prefect_client: PrefectClient) -> WorkPool: + return await prefect_client.create_work_pool( + work_pool=WorkPoolCreate( + name="test-docker-work-pool", + type="docker", + base_job_template={ + "job_configuration": {"image": "{{ image}}"}, + "variables": { + "type": "object", + "properties": { + "image": { + "title": "Image", + "type": "string", + }, + }, + }, + }, + ) + ) + + +@pytest.fixture +def interactive_console(monkeypatch): + monkeypatch.setattr("prefect.cli.deploy.is_interactive", lambda: True) + + # `readchar` does not like the fake stdin provided by typer isolation so we provide + # a version that does not require a fd to be attached + def readchar(): + sys.stdin.flush() + position = sys.stdin.tell() + if not sys.stdin.read(): + print("TEST ERROR: CLI is attempting to read input but stdin is empty.") + raise Exit(-2) + else: + sys.stdin.seek(position) + return sys.stdin.read(1) + + monkeypatch.setattr("readchar._posix_read.readchar", readchar) + + +@flow() +def tired_flow(): + print("I am so tired...") + + for _ in range(100): + print("zzzzz...") + sleep(5) + + +class TestSla: + async def test_create_sla(self): + sla = ServiceLevelAgreement( + name="test-sla", + ) + deployment_id = uuid4() + sla.set_deployment_id(deployment_id) + assert sla.owner_resource == f"prefect.deployment.{deployment_id}" + + +class TestClientCreateSla: + async def test_create_sla_against_cloud(self): + account_id = uuid4() + workspace_id = uuid4() + with temporary_settings( + updates={ + PREFECT_API_URL: f"https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}/" + } + ): + with respx.mock( + assert_all_mocked=True, + assert_all_called=False, + base_url="https://api.prefect.cloud/api", + using="httpx", + ) as router: + sla_id = str(uuid4()) + + router.get("/csrf-token", params={"client": mock.ANY}).pass_through() + router.post( + f"/accounts/{account_id}/workspaces/{workspace_id}/slas/", + ).mock( + return_value=httpx.Response( + status_code=201, + json={"id": sla_id}, + ) + ) + prefect_client = get_client() + + deployment_id = uuid4() + sla = TimeToCompletionSla( + name="test-sla", + duration=timedelta(minutes=10).total_seconds(), + ) + sla.set_deployment_id(deployment_id) + response_id = await prefect_client.create_sla(sla) + assert response_id == UUID(sla_id) + + +class TestRunnerDeploymentApply: + async def test_runner_deployment_calls_internal_method_on_apply_with_sla( + self, monkeypatch + ): + sla = TimeToCompletionSla( + name="test-sla", + duration=timedelta(minutes=10).total_seconds(), + ) + deployment = RunnerDeployment.from_flow( + flow=tired_flow, + name=__file__, + _sla=sla, + ) + monkeypatch.setattr( + deployment, "_create_slas", mock.AsyncMock(name="mock_create_slas") + ) + + await deployment.apply() + + assert deployment._create_slas.called + + @pytest.fixture + def client(self, monkeypatch, prefect_client): + monkeypatch.setattr(prefect_client, "server_type", ServerType.CLOUD) + + monkeypatch.setattr( + prefect_client, "create_sla", mock.AsyncMock(name="mock_create_sla") + ) + return prefect_client + + async def test_create_deployment_with_sla_config_against_cloud( + self, deployment, client + ): + sla = TimeToCompletionSla( + name="test-sla", + duration=timedelta(minutes=10).total_seconds(), + ) + deployment = RunnerDeployment.from_flow( + flow=tired_flow, + name=__file__, + _sla=sla, + ) + await deployment._create_slas(uuid4(), client) + assert client.create_sla.await_args_list[0].args[0].name == sla.name + assert client.create_sla.await_args_list[0].args[0].duration == sla.duration + + async def test_create_deployment_with_multiple_slas_against_cloud(self, client): + sla1 = TimeToCompletionSla( + name="a little long", + severity="moderate", + duration=timedelta(minutes=10).total_seconds(), + ) + sla2 = TimeToCompletionSla( + name="whoa this is bad", + severity="high", + duration=timedelta(minutes=30).total_seconds(), + ) + deployment = RunnerDeployment.from_flow( + flow=tired_flow, + name=__file__, + _sla=[sla1, sla2], + ) + await deployment._create_slas(uuid4(), client) + calls = client.create_sla.await_args_list + assert len(calls) == 2 + assert calls[0].args[0].name == sla1.name + assert calls[1].args[0].name == sla2.name + + async def test_create_deployment_against_oss_server_produces_error_log( + self, prefect_client + ): + sla = TimeToCompletionSla( + name="test-sla", + duration=timedelta(minutes=10).total_seconds(), + ) + deployment = RunnerDeployment.from_flow( + flow=tired_flow, + name=__file__, + _sla=sla, + ) + + with pytest.raises( + ValueError, + match="SLA configuration is currently only supported on Prefect Cloud.", + ): + await deployment._create_slas(uuid4(), prefect_client) + + async def test_failure_to_create_one_sla_does_not_prevent_other_slas_from_being_created( + self, client + ): + sla1 = TimeToCompletionSla( + name="a little long", + duration=timedelta(minutes=10).total_seconds(), + ) + + sla2 = TimeToCompletionSla( + name="whoa this is bad", + duration=timedelta(minutes=30).total_seconds(), + ) + + client.create_sla.side_effect = [None, ValueError("Failed to create SLA")] + + deployment = RunnerDeployment.from_flow( + flow=tired_flow, + name=__file__, + _sla=[sla1, sla2], + ) + + with pytest.raises(ExceptionGroup) as exc_info: # novermin + await deployment._create_slas(uuid4(), client) + + assert len(client.create_sla.await_args_list) == 2 + assert client.create_sla.await_args_list[0].args[0].name == sla1.name + assert client.create_sla.await_args_list[1].args[0].name == sla2.name + assert str(exc_info.value) == "Failed to create SLAs (1 sub-exception)" + assert len(exc_info.value.exceptions) == 1 + assert str(exc_info.value.exceptions[0]) == "Failed to create SLA" + + +class TestDeploymentCLI: + class TestSlaSyncing: + async def test_initialize_slas(self): + sla_spec = { + "name": "test-sla", + "duration": 1800, + "severity": "high", + } + + deployment_id = uuid4() + slas = _initialize_deployment_slas(deployment_id, [sla_spec]) + assert slas == [ + TimeToCompletionSla( + name="test-sla", + duration=1800, + severity="high", + ).set_deployment_id(deployment_id) + ] + + async def test_initialize_multiple_slas(self): + sla_spec_1 = { + "name": "test-sla-1", + "duration": 1800, + "severity": "high", + } + sla_spec_2 = { + "name": "test-sla-2", + "duration": 3600, + "severity": "critical", + } + + deployment_id = uuid4() + slas = _initialize_deployment_slas(deployment_id, [sla_spec_1, sla_spec_2]) + assert slas == [ + TimeToCompletionSla( + name="test-sla-1", + duration=1800, + severity="high", + ).set_deployment_id(deployment_id), + TimeToCompletionSla( + name="test-sla-2", + duration=3600, + severity="critical", + ).set_deployment_id(deployment_id), + ] + + async def test_create_slas(self): + client = mock.AsyncMock() + client.server_type = ServerType.CLOUD + + sla_spec = { + "name": "test-sla", + "duration": 1800, + "severity": "high", + } + + deployment_id = uuid4() + slas = _initialize_deployment_slas(deployment_id, [sla_spec]) + + await _create_slas(client, slas) + + assert slas[0]._deployment_id == deployment_id + assert slas[0].owner_resource == f"prefect.deployment.{deployment_id}" + client.create_sla.assert_called_once_with(slas[0]) + + async def test_sla_creation_orchestrated( + self, project_dir, prefect_client, work_pool + ): + prefect_file = Path("prefect.yaml") + with prefect_file.open(mode="r") as f: + contents = yaml.safe_load(f) + + contents["deployments"] = [ + { + "name": "test-name-1", + "work_pool": { + "name": work_pool.name, + }, + "sla": [ + { + "name": "test-sla", + "duration": 1800, + "severity": "high", + } + ], + } + ] + + expected_slas = _initialize_deployment_slas( + uuid4(), contents["deployments"][0]["sla"] + ) + + with prefect_file.open(mode="w") as f: + yaml.safe_dump(contents, f) + + with mock.patch( + "prefect.cli.deploy._create_slas", + mock.AsyncMock(), + ) as create_slas: + await run_sync_in_worker_thread( + invoke_and_assert, + command="deploy ./flows/hello.py:my_flow -n test-name-1", + expected_code=0, + ) + + assert create_slas.call_count == 1 + + client, slas = create_slas.call_args[0] + assert isinstance(client, PrefectClient) + + returned_deployment_id = slas[0]._deployment_id + for sla in expected_slas: + sla.set_deployment_id(returned_deployment_id) + + assert slas == expected_slas + + class TestSlaPassedViaCLI: + @pytest.mark.usefixtures("project_dir") + async def test_json_string_sla(self, docker_work_pool): + client = mock.AsyncMock() + client.server_type = ServerType.CLOUD + + sla_spec = { + "name": "test-sla", + "duration": 1800, + "severity": "high", + } + + expected_slas = [ + TimeToCompletionSla( + name="test-sla", + duration=1800, + severity="high", + ) + ] + + with mock.patch( + "prefect.cli.deploy._create_slas", + mock.AsyncMock(), + ) as create_slas: + await run_sync_in_worker_thread( + invoke_and_assert, + command=( + "deploy ./flows/hello.py:my_flow -n test-name-1 --sla" + f" '{json.dumps(sla_spec)}' -p {docker_work_pool.name}" + ), + expected_code=0, + ) + + assert create_slas.call_count == 1 + + client, slas = create_slas.call_args[0] + assert isinstance(client, PrefectClient) + + returned_deployment_id = slas[0]._deployment_id + for sla in expected_slas: + sla.set_deployment_id(returned_deployment_id) + + assert slas == expected_slas + + @pytest.mark.usefixtures("project_dir") + async def test_json_file_sla(self, docker_work_pool): + client = mock.AsyncMock() + client.server_type = ServerType.CLOUD + + sla_spec = { + "name": "test-sla", + "duration": 1800, + "severity": "high", + } + + with open("sla.json", "w") as f: + json.dump({"sla": [sla_spec]}, f) + + expected_slas = [ + TimeToCompletionSla( + name="test-sla", + duration=1800, + severity="high", + ) + ] + + with mock.patch( + "prefect.cli.deploy._create_slas", + mock.AsyncMock(), + ) as create_slas: + await run_sync_in_worker_thread( + invoke_and_assert, + command=( + "deploy ./flows/hello.py:my_flow -n test-name-1" + f" --sla sla.json -p {docker_work_pool.name}" + ), + expected_code=0, + ) + + assert create_slas.call_count == 1 + + client, slas = create_slas.call_args[0] + assert isinstance(client, PrefectClient) + + returned_deployment_id = slas[0]._deployment_id + for sla in expected_slas: + sla.set_deployment_id(returned_deployment_id) + + assert slas == expected_slas + + @pytest.mark.usefixtures("project_dir") + async def test_yaml_file_sla(self, docker_work_pool): + client = mock.AsyncMock() + client.server_type = ServerType.CLOUD + sla_spec = { + "name": "test-sla", + "duration": 1800, + "severity": "high", + } + + with open("sla.yaml", "w") as f: + yaml.safe_dump({"sla": [sla_spec]}, f) + + expected_slas = [ + TimeToCompletionSla( + name="test-sla", + duration=1800, + severity="high", + ) + ] + + with mock.patch( + "prefect.cli.deploy._create_slas", + mock.AsyncMock(), + ) as create_slas: + await run_sync_in_worker_thread( + invoke_and_assert, + command=( + "deploy ./flows/hello.py:my_flow -n test-name-1" + f" --sla sla.yaml -p {docker_work_pool.name}" + ), + expected_code=0, + ) + + assert create_slas.call_count == 1 + + client, slas = create_slas.call_args[0] + assert isinstance(client, PrefectClient) + + returned_deployment_id = slas[0]._deployment_id + for sla in expected_slas: + sla.set_deployment_id(returned_deployment_id) + + assert slas == expected_slas + + @pytest.mark.usefixtures("project_dir") + async def test_nested_yaml_file_sla(self, docker_work_pool, tmpdir): + client = mock.AsyncMock() + client.server_type = ServerType.CLOUD + + sla_spec = { + "name": "test-sla", + "duration": 1800, + "severity": "high", + } + + slas_file = tmpdir.mkdir("my_stuff") / "sla.yaml" + with open(slas_file, "w") as f: + yaml.safe_dump({"sla": [sla_spec]}, f) + + expected_slas = [ + TimeToCompletionSla( + name="test-sla", + duration=1800, + severity="high", + ) + ] + + with mock.patch( + "prefect.cli.deploy._create_slas", + mock.AsyncMock(), + ) as create_slas: + await run_sync_in_worker_thread( + invoke_and_assert, + command=( + "deploy ./flows/hello.py:my_flow -n test-name-1" + f" --sla my_stuff/sla.yaml -p {docker_work_pool.name}" + ), + expected_code=0, + ) + + assert create_slas.call_count == 1 + + client, slas = create_slas.call_args[0] + assert isinstance(client, PrefectClient) + + returned_deployment_id = slas[0]._deployment_id + for sla in expected_slas: + sla.set_deployment_id(returned_deployment_id) + + assert slas == expected_slas + + @pytest.mark.usefixtures("project_dir") + async def test_multiple_sla_flags(self, docker_work_pool): + client = mock.AsyncMock() + client.server_type = ServerType.CLOUD + + sla_spec_1 = { + "name": "test-sla-1", + "duration": 1800, + "severity": "high", + } + + sla_spec_2 = { + "name": "test-sla-2", + "duration": 3600, + "severity": "critical", + } + + with open("sla.yaml", "w") as f: + yaml.safe_dump({"sla": [sla_spec_2]}, f) + + expected_slas = [ + TimeToCompletionSla( + name="test-sla-1", + duration=1800, + severity="high", + ), + TimeToCompletionSla( + name="test-sla-2", + duration=3600, + severity="critical", + ), + ] + + with mock.patch( + "prefect.cli.deploy._create_slas", + mock.AsyncMock(), + ) as create_slas: + await run_sync_in_worker_thread( + invoke_and_assert, + command=( + "deploy ./flows/hello.py:my_flow -n test-name-1 --sla" + f" '{json.dumps(sla_spec_1)}' --sla sla.yaml -p" + f" {docker_work_pool.name}" + ), + expected_code=0, + ) + + assert create_slas.call_count == 1 + + client, slas = create_slas.call_args[0] + assert isinstance(client, PrefectClient) + + returned_deployment_id = slas[0]._deployment_id + for sla in expected_slas: + sla.set_deployment_id(returned_deployment_id) + + assert slas == expected_slas + + @pytest.mark.usefixtures("project_dir") + async def test_override_on_sla_conflict(self, docker_work_pool): + client = mock.AsyncMock() + client.server_type = ServerType.CLOUD + + cli_sla_spec = { + "name": "cli-sla", + "duration": 1800, + "severity": "high", + } + + file_sla_spec = { + "name": "file-sla", + "duration": 1800, + "severity": "high", + } + + expected_slas = [ + TimeToCompletionSla( + name="cli-sla", + duration=1800, + severity="high", + ) + ] + + prefect_file = Path("prefect.yaml") + with prefect_file.open(mode="r") as f: + contents = yaml.safe_load(f) + + contents["deployments"] = [ + { + "name": "test-name-1", + "work_pool": { + "name": docker_work_pool.name, + }, + "slas": [ + file_sla_spec, + ], + } + ] + + with prefect_file.open(mode="w") as f: + yaml.safe_dump(contents, f) + + with mock.patch( + "prefect.cli.deploy._create_slas", + mock.AsyncMock(), + ) as create_slas: + await run_sync_in_worker_thread( + invoke_and_assert, + command=( + "deploy ./flows/hello.py:my_flow -n test-name-1" + f" --sla '{json.dumps(cli_sla_spec)}'" + ), + expected_code=0, + ) + + client, slas = create_slas.call_args[0] + assert isinstance(client, PrefectClient) + assert len(slas) == 1 + returned_deployment_id = slas[0]._deployment_id + for sla in expected_slas: + sla.set_deployment_id(returned_deployment_id) + assert slas == expected_slas + + @pytest.mark.usefixtures("project_dir") + async def test_invalid_sla_parsing(self, docker_work_pool): + client = mock.AsyncMock() + client.server_type = ServerType.CLOUD + + invalid_json_str_sla = "{foo: bar, baz: bat}" + invalid_yaml_sla = "invalid.yaml" + + with open(invalid_yaml_sla, "w") as f: + f.write("pretty please, let me know if the flow runs for too long") + + for invalid_sla in [invalid_json_str_sla, invalid_yaml_sla]: + with mock.patch( + "prefect.cli.deploy._create_slas", + mock.AsyncMock(), + ): + await run_sync_in_worker_thread( + invoke_and_assert, + command=( + "deploy ./flows/hello.py:my_flow -n test-name-1" + f" -p {docker_work_pool.name} --sla '{invalid_sla}'" + ), + expected_code=1, + expected_output_contains=["Failed to parse SLA"], + ) + + @pytest.mark.usefixtures("interactive_console", "project_dir") + async def test_slas_saved_to_prefect_yaml( + self, + docker_work_pool, + ): + client = mock.AsyncMock() + client.server_type = ServerType.CLOUD + + cli_sla_spec = { + "name": "test-sla", + "duration": 1800, + "severity": "high", + } + + with mock.patch( + "prefect.cli.deploy._create_slas", + mock.AsyncMock(), + ): + await run_sync_in_worker_thread( + invoke_and_assert, + command=( + "deploy ./flows/hello.py:my_flow -n test-name-1 -p" + f" {docker_work_pool.name} --sla" + f" '{json.dumps(cli_sla_spec)}'" + ), + user_input=( + # Decline schedule + "n" + + readchar.key.ENTER + # Decline docker build + + "n" + + readchar.key.ENTER + # Accept save configuration + + "y" + + readchar.key.ENTER + ), + expected_code=0, + ) + + # Read the updated prefect.yaml + prefect_file = Path("prefect.yaml") + with prefect_file.open(mode="r") as f: + contents = yaml.safe_load(f) + + assert "deployments" in contents + assert "sla" in contents["deployments"][-1] + assert contents["deployments"][-1]["sla"] == [cli_sla_spec]