From 6108a8160c0691f90c96f3ad133728205288f238 Mon Sep 17 00:00:00 2001 From: Dylan Hughes Date: Tue, 31 Dec 2024 12:51:35 -0500 Subject: [PATCH 1/7] first pass at sla support for prefect deploy --- src/prefect/_experimental/sla.py | 10 ++-- src/prefect/cli/deploy.py | 78 +++++++++++++++++++++++++++++ src/prefect/client/orchestration.py | 5 ++ 3 files changed, 87 insertions(+), 6 deletions(-) diff --git a/src/prefect/_experimental/sla.py b/src/prefect/_experimental/sla.py index 4b25c3e01077..49ad869655bf 100644 --- a/src/prefect/_experimental/sla.py +++ b/src/prefect/_experimental/sla.py @@ -31,12 +31,10 @@ def set_deployment_id(self, deployment_id: UUID): @computed_field @property - def owner_resource(self) -> str: - if not self._deployment_id: - raise ValueError( - "Deployment ID is not set. Please set using `set_deployment_id`." - ) - return f"prefect.deployment.{self._deployment_id}" + def owner_resource(self) -> str | None: + if self._deployment_id: + return f"prefect.deployment.{self._deployment_id}" + return None class TimeToCompletionSla(ServiceLevelAgreement): diff --git a/src/prefect/cli/deploy.py b/src/prefect/cli/deploy.py index cf75f7cf00cd..fcf7b8b33b79 100644 --- a/src/prefect/cli/deploy.py +++ b/src/prefect/cli/deploy.py @@ -19,6 +19,7 @@ from yaml.error import YAMLError import prefect +from prefect._experimental.sla import SlaTypes from prefect._internal.compatibility.deprecated import ( generate_deprecation_message, ) @@ -40,6 +41,7 @@ exit_with_error, ) from prefect.cli.root import app, is_interactive +from prefect.client.base import ServerType from prefect.client.schemas.actions import DeploymentScheduleCreate from prefect.client.schemas.filters import WorkerFilter from prefect.client.schemas.objects import ConcurrencyLimitConfig @@ -350,6 +352,12 @@ async def deploy( "--prefect-file", help="Specify a custom path to a prefect.yaml file", ), + sla: List[str] = typer.Option( + None, + "--sla", + help="Experimental: One or more SLA configurations for the deployment. May be" + " removed or modified at any time. Currently only supported on Prefect Cloud.", + ), ): """ Create a deployment to deploy a flow from this project. @@ -405,6 +413,7 @@ async def deploy( "triggers": trigger, "param": param, "params": params, + "sla": sla, } try: deploy_configs, actions = _load_deploy_configs_and_actions( @@ -648,6 +657,12 @@ async def _run_single_deploy( else: triggers = [] + sla_specs = _gather_sla_definitions(options.get("sla"), deploy_config.get("sla")) + if len(sla_specs) > 0: + slas = sla_specs + else: + slas = [] + pull_steps = ( pull_steps or actions.get("pull") @@ -733,6 +748,7 @@ async def _run_single_deploy( ) await _create_deployment_triggers(client, deployment_id, triggers) + await _create_slas(client, deployment_id, slas) app.console.print( Panel( @@ -791,6 +807,7 @@ async def _run_single_deploy( push_steps=push_steps or None, pull_steps=pull_steps or None, triggers=trigger_specs or None, + slas=slas or None, prefect_file=prefect_file, ) app.console.print( @@ -1737,3 +1754,64 @@ def _handle_deprecated_schedule_fields(deploy_config: Dict): ) return deploy_config + + +def _gather_sla_definitions( + sla_flags: List[str], existing_slas: List[Dict[str, Any]] +) -> List[Dict[str, Any]]: + """Parses SLA flags from CLI and existing deployment config in `prefect.yaml`. + + Args: + sla_flags: SLAs passed via CLI, either as JSON strings or file paths. + existing_slas: SLAs from existing deployment configuration. + + Returns: + List of SLA specifications. + + Raises: + ValueError: If SLA flag is not a valid JSON string or file path. + """ + + if sla_flags: + sla_specs = [] + for s in sla_flags: + try: + if s.endswith(".yaml"): + with open(s, "r") as f: + sla_specs.extend(yaml.safe_load(f).get("sla", [])) + elif s.endswith(".json"): + with open(s, "r") as f: + sla_specs.extend(json.load(f).get("sla", [])) + else: + sla_specs.append(json.loads(s)) + except Exception as e: + raise ValueError(f"Failed to parse SLA: {s}. Error: {str(e)}") + return sla_specs + + return existing_slas + + +async def _create_slas( + client: "PrefectClient", + deployment_id: UUID, + slas: List[dict], +): + if client.server_type == ServerType.CLOUD: + exceptions = [] + for sla in slas: + try: + initialized_sla = pydantic.TypeAdapter(SlaTypes).validate_python(sla) + initialized_sla.set_deployment_id(deployment_id) + await client.create_sla(initialized_sla) + except Exception as e: + app.console.print( + f"Failed to create SLA: {sla.get("name")}. Error: {str(e)}", + style="red", + ) + exceptions.append((f"Failed to create SLA: {sla.get('name')}", e)) + if exceptions: + raise ValueError("Failed to create one or more SLAs.", exceptions) + else: + raise ValueError( + "SLA configuration is currently only supported on Prefect Cloud." + ) diff --git a/src/prefect/client/orchestration.py b/src/prefect/client/orchestration.py index dc5b081c7934..f2530d4eee14 100644 --- a/src/prefect/client/orchestration.py +++ b/src/prefect/client/orchestration.py @@ -3518,6 +3518,11 @@ async def create_sla(self, sla: SlaTypes) -> UUID: Returns: the ID of the SLA in the backend """ + if not sla.owner_resource: + raise ValueError( + "Deployment ID is not set. Please set using `set_deployment_id`." + ) + response = await self._client.post( "/slas/", json=sla.model_dump(mode="json", exclude_unset=True), From 13b0ca8e133e95a56960c3d9bb728a49d83d74f2 Mon Sep 17 00:00:00 2001 From: Dylan Hughes Date: Thu, 2 Jan 2025 11:09:34 -0500 Subject: [PATCH 2/7] first round of tests --- src/prefect/cli/deploy.py | 31 +++++++----- tests/experimental/test_sla.py | 87 ++++++++++++++++++++++++++++++++++ 2 files changed, 105 insertions(+), 13 deletions(-) diff --git a/src/prefect/cli/deploy.py b/src/prefect/cli/deploy.py index fcf7b8b33b79..f988419ca00e 100644 --- a/src/prefect/cli/deploy.py +++ b/src/prefect/cli/deploy.py @@ -657,10 +657,8 @@ async def _run_single_deploy( else: triggers = [] - sla_specs = _gather_sla_definitions(options.get("sla"), deploy_config.get("sla")) - if len(sla_specs) > 0: - slas = sla_specs - else: + slas = _initialize_deployment_slas(options.get("sla"), deploy_config.get("sla")) + if not slas: slas = [] pull_steps = ( @@ -1756,10 +1754,11 @@ def _handle_deprecated_schedule_fields(deploy_config: Dict): return deploy_config -def _gather_sla_definitions( - sla_flags: List[str], existing_slas: List[Dict[str, Any]] -) -> List[Dict[str, Any]]: +def _initialize_deployment_slas( + sla_flags: List[str] | None, existing_slas: List[Dict[str, Any]] | None +) -> List[SlaTypes] | None: """Parses SLA flags from CLI and existing deployment config in `prefect.yaml`. + Prefers CLI-provided SLAs over config in `prefect.yaml`. Args: sla_flags: SLAs passed via CLI, either as JSON strings or file paths. @@ -1786,23 +1785,29 @@ def _gather_sla_definitions( sla_specs.append(json.loads(s)) except Exception as e: raise ValueError(f"Failed to parse SLA: {s}. Error: {str(e)}") - return sla_specs + return [ + pydantic.TypeAdapter(SlaTypes).validate_python(spec) for spec in sla_specs + ] - return existing_slas + if not existing_slas: + return None + + return [ + pydantic.TypeAdapter(SlaTypes).validate_python(sla) for sla in existing_slas + ] async def _create_slas( client: "PrefectClient", deployment_id: UUID, - slas: List[dict], + slas: List[SlaTypes], ): if client.server_type == ServerType.CLOUD: exceptions = [] for sla in slas: try: - initialized_sla = pydantic.TypeAdapter(SlaTypes).validate_python(sla) - initialized_sla.set_deployment_id(deployment_id) - await client.create_sla(initialized_sla) + sla.set_deployment_id(deployment_id) + await client.create_sla(sla) except Exception as e: app.console.print( f"Failed to create SLA: {sla.get("name")}. Error: {str(e)}", diff --git a/tests/experimental/test_sla.py b/tests/experimental/test_sla.py index ee1eea8a36a9..d8cd3288ec24 100644 --- a/tests/experimental/test_sla.py +++ b/tests/experimental/test_sla.py @@ -1,3 +1,4 @@ +import json from datetime import timedelta from time import sleep from unittest import mock @@ -7,11 +8,16 @@ import pytest import respx +import prefect from prefect import flow from prefect._experimental.sla import ( ServiceLevelAgreement, TimeToCompletionSla, ) +from prefect.cli.deploy import ( + _create_slas, + _initialize_deployment_slas, +) from prefect.client.base import ServerType from prefect.client.orchestration import get_client from prefect.deployments.runner import RunnerDeployment @@ -20,6 +26,8 @@ temporary_settings, ) +TEST_PROJECTS_DIR = prefect.__development_base_path__ / "tests" / "test-projects" + @flow() def tired_flow(): @@ -206,3 +214,82 @@ async def test_failure_to_create_one_sla_does_not_prevent_other_slas_from_being_ str(exc_info.value) == """[("SLA named 'whoa this is bad' failed to create", ValueError('Failed to create SLA'))]""" ) + + +class TestDeploymentCLI: + class TestSlaSyncing: + async def test_initialize_slas(self): + sla_spec = json.dumps( + { + "name": "test-sla", + "duration": 1800, + "severity": "high", + } + ) + + slas = _initialize_deployment_slas([sla_spec], None) + assert slas == [ + TimeToCompletionSla( + name="test-sla", + duration=1800, + severity="high", + ) + ] + + async def test_initialize_slas_prefers_flag_over_config(self): + sla_flag_spec = json.dumps( + { + "name": "test-sla-from-flag", + "duration": 1800, + "severity": "high", + } + ) + + sla_config_spec = { + "name": "test-sla-from-config", + "duration": 3600, + "severity": "low", + } + + slas = _initialize_deployment_slas([sla_flag_spec], [sla_config_spec]) + assert slas == [ + TimeToCompletionSla( + name="test-sla-from-flag", + duration=1800, + severity="high", + ) + ] + + async def test_sla_initialize_falls_back_to_config(self): + sla_spec = { + "name": "test-sla", + "duration": 1800, + "severity": "high", + } + slas = _initialize_deployment_slas(None, [sla_spec]) + assert slas == [ + TimeToCompletionSla(name="test-sla", duration=1800, severity="high") + ] + + async def test_sla_initialize_handles_file_path_flags(self): + # sla_spec = "tests/test-projects/sla_config.yaml" + pass + + async def test_create_slas(self): + client = mock.AsyncMock() + client.server_type = ServerType.CLOUD + + sla_spec = { + "name": "test-sla", + "duration": 1800, + "severity": "high", + } + + slas = _initialize_deployment_slas(None, [sla_spec]) + deployment_id = uuid4() + + await _create_slas(client, deployment_id, slas) + + assert slas[0]._deployment_id == deployment_id + assert slas[0].owner_resource == f"prefect.deployment.{deployment_id}" + client.create_sla.assert_called_once_with(slas[0]) From b82c015ef8c56a21cb8411417981d1c8451f3f71 Mon Sep 17 00:00:00 2001 From: Dylan Hughes Date: Thu, 2 Jan 2025 12:38:13 -0500 Subject: [PATCH 3/7] moar better tests for SLA deployment cli --- src/prefect/_experimental/sla.py | 1 + src/prefect/cli/deploy.py | 61 ++-- src/prefect/deployments/base.py | 4 + tests/experimental/test_sla.py | 552 +++++++++++++++++++++++++++++-- 4 files changed, 553 insertions(+), 65 deletions(-) diff --git a/src/prefect/_experimental/sla.py b/src/prefect/_experimental/sla.py index 49ad869655bf..9d15e1fdb4bc 100644 --- a/src/prefect/_experimental/sla.py +++ b/src/prefect/_experimental/sla.py @@ -28,6 +28,7 @@ class ServiceLevelAgreement(PrefectBaseModel, abc.ABC): def set_deployment_id(self, deployment_id: UUID): self._deployment_id = deployment_id + return self @computed_field @property diff --git a/src/prefect/cli/deploy.py b/src/prefect/cli/deploy.py index f988419ca00e..6478be9e1a55 100644 --- a/src/prefect/cli/deploy.py +++ b/src/prefect/cli/deploy.py @@ -657,10 +657,6 @@ async def _run_single_deploy( else: triggers = [] - slas = _initialize_deployment_slas(options.get("sla"), deploy_config.get("sla")) - if not slas: - slas = [] - pull_steps = ( pull_steps or actions.get("pull") @@ -746,7 +742,14 @@ async def _run_single_deploy( ) await _create_deployment_triggers(client, deployment_id, triggers) - await _create_slas(client, deployment_id, slas) + + if sla_specs := _gather_deployment_sla_definitions( + options.get("sla"), deploy_config.get("sla") + ): + slas = _initialize_deployment_slas(deployment_id, sla_specs) + await _create_slas(client, slas) + else: + slas = [] app.console.print( Panel( @@ -805,7 +808,7 @@ async def _run_single_deploy( push_steps=push_steps or None, pull_steps=pull_steps or None, triggers=trigger_specs or None, - slas=slas or None, + sla=sla_specs or None, prefect_file=prefect_file, ) app.console.print( @@ -1754,23 +1757,12 @@ def _handle_deprecated_schedule_fields(deploy_config: Dict): return deploy_config -def _initialize_deployment_slas( +def _gather_deployment_sla_definitions( sla_flags: List[str] | None, existing_slas: List[Dict[str, Any]] | None -) -> List[SlaTypes] | None: +) -> List[Dict[str, Any]] | None: """Parses SLA flags from CLI and existing deployment config in `prefect.yaml`. Prefers CLI-provided SLAs over config in `prefect.yaml`. - - Args: - sla_flags: SLAs passed via CLI, either as JSON strings or file paths. - existing_slas: SLAs from existing deployment configuration. - - Returns: - List of SLA specifications. - - Raises: - ValueError: If SLA flag is not a valid JSON string or file path. """ - if sla_flags: sla_specs = [] for s in sla_flags: @@ -1785,28 +1777,39 @@ def _initialize_deployment_slas( sla_specs.append(json.loads(s)) except Exception as e: raise ValueError(f"Failed to parse SLA: {s}. Error: {str(e)}") - return [ - pydantic.TypeAdapter(SlaTypes).validate_python(spec) for spec in sla_specs - ] + return sla_specs - if not existing_slas: - return None + return existing_slas - return [ - pydantic.TypeAdapter(SlaTypes).validate_python(sla) for sla in existing_slas - ] + +def _initialize_deployment_slas( + deployment_id: UUID, sla_specs: List[Dict[str, Any]] +) -> List[SlaTypes] | None: + """Initializes SLAs for a deployment. + + Args: + deployment_id: Deployment ID. + sla_specs: SLA specification dictionaries. + + Returns: + List of SLAs. + """ + slas = [pydantic.TypeAdapter(SlaTypes).validate_python(spec) for spec in sla_specs] + + for sla in slas: + sla.set_deployment_id(deployment_id) + + return slas async def _create_slas( client: "PrefectClient", - deployment_id: UUID, slas: List[SlaTypes], ): if client.server_type == ServerType.CLOUD: exceptions = [] for sla in slas: try: - sla.set_deployment_id(deployment_id) await client.create_sla(sla) except Exception as e: app.console.print( diff --git a/src/prefect/deployments/base.py b/src/prefect/deployments/base.py index ec2b33cb1632..a9f5207f5f99 100644 --- a/src/prefect/deployments/base.py +++ b/src/prefect/deployments/base.py @@ -275,6 +275,7 @@ def _save_deployment_to_prefect_file( push_steps: Optional[List[Dict]] = None, pull_steps: Optional[List[Dict]] = None, triggers: Optional[List[Dict]] = None, + sla: Optional[List[Dict]] = None, prefect_file: Path = Path("prefect.yaml"), ): """ @@ -319,6 +320,9 @@ def _save_deployment_to_prefect_file( if triggers and triggers != parsed_prefect_file_contents.get("triggers"): deployment["triggers"] = triggers + if sla and sla != parsed_prefect_file_contents.get("sla"): + deployment["sla"] = sla + deployments = parsed_prefect_file_contents.get("deployments") if deployments is None: parsed_prefect_file_contents["deployments"] = [deployment] diff --git a/tests/experimental/test_sla.py b/tests/experimental/test_sla.py index d8cd3288ec24..ea1885055d36 100644 --- a/tests/experimental/test_sla.py +++ b/tests/experimental/test_sla.py @@ -1,12 +1,18 @@ import json +import shutil +import sys from datetime import timedelta +from pathlib import Path from time import sleep from unittest import mock from uuid import UUID, uuid4 import httpx import pytest +import readchar import respx +import yaml +from typer import Exit import prefect from prefect import flow @@ -19,16 +25,73 @@ _initialize_deployment_slas, ) from prefect.client.base import ServerType -from prefect.client.orchestration import get_client +from prefect.client.orchestration import PrefectClient, get_client +from prefect.client.schemas.actions import WorkPoolCreate +from prefect.client.schemas.objects import WorkPool +from prefect.deployments.base import initialize_project from prefect.deployments.runner import RunnerDeployment from prefect.settings import ( PREFECT_API_URL, temporary_settings, ) +from prefect.testing.cli import invoke_and_assert +from prefect.utilities.asyncutils import run_sync_in_worker_thread +from prefect.utilities.filesystem import tmpchdir TEST_PROJECTS_DIR = prefect.__development_base_path__ / "tests" / "test-projects" +@pytest.fixture +def project_dir(tmp_path): + with tmpchdir(tmp_path): + shutil.copytree(TEST_PROJECTS_DIR, tmp_path, dirs_exist_ok=True) + prefect_home = tmp_path / ".prefect" + prefect_home.mkdir(exist_ok=True, mode=0o0700) + initialize_project() + yield tmp_path + + +@pytest.fixture +async def docker_work_pool(prefect_client: PrefectClient) -> WorkPool: + return await prefect_client.create_work_pool( + work_pool=WorkPoolCreate( + name="test-docker-work-pool", + type="docker", + base_job_template={ + "job_configuration": {"image": "{{ image}}"}, + "variables": { + "type": "object", + "properties": { + "image": { + "title": "Image", + "type": "string", + }, + }, + }, + }, + ) + ) + + +@pytest.fixture +def interactive_console(monkeypatch): + monkeypatch.setattr("prefect.cli.deploy.is_interactive", lambda: True) + + # `readchar` does not like the fake stdin provided by typer isolation so we provide + # a version that does not require a fd to be attached + def readchar(): + sys.stdin.flush() + position = sys.stdin.tell() + if not sys.stdin.read(): + print("TEST ERROR: CLI is attempting to read input but stdin is empty.") + raise Exit(-2) + else: + sys.stdin.seek(position) + return sys.stdin.read(1) + + monkeypatch.setattr("readchar._posix_read.readchar", readchar) + + @flow() def tired_flow(): print("I am so tired...") @@ -219,63 +282,256 @@ async def test_failure_to_create_one_sla_does_not_prevent_other_slas_from_being_ class TestDeploymentCLI: class TestSlaSyncing: async def test_initialize_slas(self): - sla_spec = json.dumps( - { - "name": "test-sla", - "duration": 1800, - "severity": "high", - } - ) + sla_spec = { + "name": "test-sla", + "duration": 1800, + "severity": "high", + } - slas = _initialize_deployment_slas([sla_spec], None) + deployment_id = uuid4() + slas = _initialize_deployment_slas(deployment_id, [sla_spec]) assert slas == [ TimeToCompletionSla( name="test-sla", duration=1800, severity="high", - ) + ).set_deployment_id(deployment_id) ] - async def test_initialize_slas_prefers_flag_over_config(self): - sla_flag_spec = json.dumps( + async def test_initialize_multiple_slas(self): + sla_spec_1 = { + "name": "test-sla-1", + "duration": 1800, + "severity": "high", + } + sla_spec_2 = { + "name": "test-sla-2", + "duration": 3600, + "severity": "critical", + } + + deployment_id = uuid4() + slas = _initialize_deployment_slas(deployment_id, [sla_spec_1, sla_spec_2]) + assert slas == [ + TimeToCompletionSla( + name="test-sla-1", + duration=1800, + severity="high", + ).set_deployment_id(deployment_id), + TimeToCompletionSla( + name="test-sla-2", + duration=3600, + severity="critical", + ).set_deployment_id(deployment_id), + ] + + async def test_create_slas(self): + client = mock.AsyncMock() + client.server_type = ServerType.CLOUD + + sla_spec = { + "name": "test-sla", + "duration": 1800, + "severity": "high", + } + + deployment_id = uuid4() + slas = _initialize_deployment_slas(deployment_id, [sla_spec]) + + await _create_slas(client, slas) + + assert slas[0]._deployment_id == deployment_id + assert slas[0].owner_resource == f"prefect.deployment.{deployment_id}" + client.create_sla.assert_called_once_with(slas[0]) + + async def test_sla_creation_orchestrated( + self, project_dir, prefect_client, work_pool + ): + prefect_file = Path("prefect.yaml") + with prefect_file.open(mode="r") as f: + contents = yaml.safe_load(f) + + contents["deployments"] = [ { - "name": "test-sla-from-flag", - "duration": 1800, - "severity": "high", + "name": "test-name-1", + "work_pool": { + "name": work_pool.name, + }, + "sla": [ + { + "name": "test-sla", + "duration": 1800, + "severity": "high", + } + ], } + ] + + expected_slas = _initialize_deployment_slas( + uuid4(), contents["deployments"][0]["sla"] ) - sla_config_spec = { - "name": "test-sla-from-config", - "duration": 3600, - "severity": "low", + with prefect_file.open(mode="w") as f: + yaml.safe_dump(contents, f) + + with mock.patch( + "prefect.cli.deploy._create_slas", + mock.AsyncMock(), + ) as create_slas: + await run_sync_in_worker_thread( + invoke_and_assert, + command="deploy ./flows/hello.py:my_flow -n test-name-1", + expected_code=0, + ) + + assert create_slas.call_count == 1 + + client, slas = create_slas.call_args[0] + assert isinstance(client, PrefectClient) + + returned_deployment_id = slas[0]._deployment_id + for sla in expected_slas: + sla.set_deployment_id(returned_deployment_id) + + assert slas == expected_slas + + class TestSlaPassedViaCLI: + @pytest.mark.usefixtures("project_dir") + async def test_json_string_sla(self, docker_work_pool): + client = mock.AsyncMock() + client.server_type = ServerType.CLOUD + + sla_spec = { + "name": "test-sla", + "duration": 1800, + "severity": "high", } - slas = _initialize_deployment_slas([sla_flag_spec], [sla_config_spec]) - assert slas == [ + expected_slas = [ TimeToCompletionSla( - name="test-sla-from-flag", + name="test-sla", duration=1800, severity="high", ) ] - async def test_sla_initialize_falls_back_to_config(self): + with mock.patch( + "prefect.cli.deploy._create_slas", + mock.AsyncMock(), + ) as create_slas: + await run_sync_in_worker_thread( + invoke_and_assert, + command=( + "deploy ./flows/hello.py:my_flow -n test-name-1 --sla" + f" '{json.dumps(sla_spec)}' -p {docker_work_pool.name}" + ), + expected_code=0, + ) + + assert create_slas.call_count == 1 + + client, slas = create_slas.call_args[0] + assert isinstance(client, PrefectClient) + + returned_deployment_id = slas[0]._deployment_id + for sla in expected_slas: + sla.set_deployment_id(returned_deployment_id) + + assert slas == expected_slas + + @pytest.mark.usefixtures("project_dir") + async def test_json_file_sla(self, docker_work_pool): + client = mock.AsyncMock() + client.server_type = ServerType.CLOUD + sla_spec = { "name": "test-sla", "duration": 1800, "severity": "high", } - slas = _initialize_deployment_slas(None, [sla_spec]) - assert slas == [ - TimeToCompletionSla(name="test-sla", duration=1800, severity="high") + + with open("sla.json", "w") as f: + json.dump({"sla": [sla_spec]}, f) + + expected_slas = [ + TimeToCompletionSla( + name="test-sla", + duration=1800, + severity="high", + ) ] - async def test_sla_initialize_handles_file_path_flags(self): - # sla_spec = "tests/test-projects/sla_config.yaml" - pass + with mock.patch( + "prefect.cli.deploy._create_slas", + mock.AsyncMock(), + ) as create_slas: + await run_sync_in_worker_thread( + invoke_and_assert, + command=( + "deploy ./flows/hello.py:my_flow -n test-name-1" + f" --sla sla.json -p {docker_work_pool.name}" + ), + expected_code=0, + ) - async def test_create_slas(self): + assert create_slas.call_count == 1 + + client, slas = create_slas.call_args[0] + assert isinstance(client, PrefectClient) + + returned_deployment_id = slas[0]._deployment_id + for sla in expected_slas: + sla.set_deployment_id(returned_deployment_id) + + assert slas == expected_slas + + @pytest.mark.usefixtures("project_dir") + async def test_yaml_file_sla(self, docker_work_pool): + client = mock.AsyncMock() + client.server_type = ServerType.CLOUD + sla_spec = { + "name": "test-sla", + "duration": 1800, + "severity": "high", + } + + with open("sla.yaml", "w") as f: + yaml.safe_dump({"sla": [sla_spec]}, f) + + expected_slas = [ + TimeToCompletionSla( + name="test-sla", + duration=1800, + severity="high", + ) + ] + + with mock.patch( + "prefect.cli.deploy._create_slas", + mock.AsyncMock(), + ) as create_slas: + await run_sync_in_worker_thread( + invoke_and_assert, + command=( + "deploy ./flows/hello.py:my_flow -n test-name-1" + f" --sla sla.yaml -p {docker_work_pool.name}" + ), + expected_code=0, + ) + + assert create_slas.call_count == 1 + + client, slas = create_slas.call_args[0] + assert isinstance(client, PrefectClient) + + returned_deployment_id = slas[0]._deployment_id + for sla in expected_slas: + sla.set_deployment_id(returned_deployment_id) + + assert slas == expected_slas + + @pytest.mark.usefixtures("project_dir") + async def test_nested_yaml_file_sla(self, docker_work_pool, tmpdir): client = mock.AsyncMock() client.server_type = ServerType.CLOUD @@ -285,11 +541,235 @@ async def test_create_slas(self): "severity": "high", } - slas = _initialize_deployment_slas(None, [sla_spec]) - deployment_id = uuid4() + slas_file = tmpdir.mkdir("my_stuff") / "sla.yaml" + with open(slas_file, "w") as f: + yaml.safe_dump({"sla": [sla_spec]}, f) - await _create_slas(client, deployment_id, slas) + expected_slas = [ + TimeToCompletionSla( + name="test-sla", + duration=1800, + severity="high", + ) + ] - assert slas[0]._deployment_id == deployment_id - assert slas[0].owner_resource == f"prefect.deployment.{deployment_id}" - client.create_sla.assert_called_once_with(slas[0]) + with mock.patch( + "prefect.cli.deploy._create_slas", + mock.AsyncMock(), + ) as create_slas: + await run_sync_in_worker_thread( + invoke_and_assert, + command=( + "deploy ./flows/hello.py:my_flow -n test-name-1" + f" --sla my_stuff/sla.yaml -p {docker_work_pool.name}" + ), + expected_code=0, + ) + + assert create_slas.call_count == 1 + + client, slas = create_slas.call_args[0] + assert isinstance(client, PrefectClient) + + returned_deployment_id = slas[0]._deployment_id + for sla in expected_slas: + sla.set_deployment_id(returned_deployment_id) + + assert slas == expected_slas + + @pytest.mark.usefixtures("project_dir") + async def test_multiple_sla_flags(self, docker_work_pool): + client = mock.AsyncMock() + client.server_type = ServerType.CLOUD + + sla_spec_1 = { + "name": "test-sla-1", + "duration": 1800, + "severity": "high", + } + + sla_spec_2 = { + "name": "test-sla-2", + "duration": 3600, + "severity": "critical", + } + + with open("sla.yaml", "w") as f: + yaml.safe_dump({"sla": [sla_spec_2]}, f) + + expected_slas = [ + TimeToCompletionSla( + name="test-sla-1", + duration=1800, + severity="high", + ), + TimeToCompletionSla( + name="test-sla-2", + duration=3600, + severity="critical", + ), + ] + + with mock.patch( + "prefect.cli.deploy._create_slas", + mock.AsyncMock(), + ) as create_slas: + await run_sync_in_worker_thread( + invoke_and_assert, + command=( + "deploy ./flows/hello.py:my_flow -n test-name-1 --sla" + f" '{json.dumps(sla_spec_1)}' --sla sla.yaml -p" + f" {docker_work_pool.name}" + ), + expected_code=0, + ) + + assert create_slas.call_count == 1 + + client, slas = create_slas.call_args[0] + assert isinstance(client, PrefectClient) + + returned_deployment_id = slas[0]._deployment_id + for sla in expected_slas: + sla.set_deployment_id(returned_deployment_id) + + assert slas == expected_slas + + @pytest.mark.usefixtures("project_dir") + async def test_override_on_sla_conflict(self, docker_work_pool): + client = mock.AsyncMock() + client.server_type = ServerType.CLOUD + + cli_sla_spec = { + "name": "cli-sla", + "duration": 1800, + "severity": "high", + } + + file_sla_spec = { + "name": "file-sla", + "duration": 1800, + "severity": "high", + } + + expected_slas = [ + TimeToCompletionSla( + name="cli-sla", + duration=1800, + severity="high", + ) + ] + + prefect_file = Path("prefect.yaml") + with prefect_file.open(mode="r") as f: + contents = yaml.safe_load(f) + + contents["deployments"] = [ + { + "name": "test-name-1", + "work_pool": { + "name": docker_work_pool.name, + }, + "slas": [ + file_sla_spec, + ], + } + ] + + with prefect_file.open(mode="w") as f: + yaml.safe_dump(contents, f) + + with mock.patch( + "prefect.cli.deploy._create_slas", + mock.AsyncMock(), + ) as create_slas: + await run_sync_in_worker_thread( + invoke_and_assert, + command=( + "deploy ./flows/hello.py:my_flow -n test-name-1" + f" --sla '{json.dumps(cli_sla_spec)}'" + ), + expected_code=0, + ) + + client, slas = create_slas.call_args[0] + assert isinstance(client, PrefectClient) + assert len(slas) == 1 + returned_deployment_id = slas[0]._deployment_id + for sla in expected_slas: + sla.set_deployment_id(returned_deployment_id) + assert slas == expected_slas + + @pytest.mark.usefixtures("project_dir") + async def test_invalid_sla_parsing(self, docker_work_pool): + client = mock.AsyncMock() + client.server_type = ServerType.CLOUD + + invalid_json_str_sla = "{foo: bar, baz: bat}" + invalid_yaml_sla = "invalid.yaml" + + with open(invalid_yaml_sla, "w") as f: + f.write("pretty please, let me know if the flow runs for too long") + + for invalid_sla in [invalid_json_str_sla, invalid_yaml_sla]: + with mock.patch( + "prefect.cli.deploy._create_slas", + mock.AsyncMock(), + ): + await run_sync_in_worker_thread( + invoke_and_assert, + command=( + "deploy ./flows/hello.py:my_flow -n test-name-1" + f" -p {docker_work_pool.name} --sla '{invalid_sla}'" + ), + expected_code=1, + expected_output_contains=["Failed to parse SLA"], + ) + + @pytest.mark.usefixtures("interactive_console", "project_dir") + async def test_slas_saved_to_prefect_yaml( + self, + docker_work_pool, + ): + client = mock.AsyncMock() + client.server_type = ServerType.CLOUD + + cli_sla_spec = { + "name": "test-sla", + "duration": 1800, + "severity": "high", + } + + with mock.patch( + "prefect.cli.deploy._create_slas", + mock.AsyncMock(), + ): + await run_sync_in_worker_thread( + invoke_and_assert, + command=( + "deploy ./flows/hello.py:my_flow -n test-name-1 -p" + f" {docker_work_pool.name} --sla" + f" '{json.dumps(cli_sla_spec)}'" + ), + user_input=( + # Decline schedule + "n" + + readchar.key.ENTER + # Decline docker build + + "n" + + readchar.key.ENTER + # Accept save configuration + + "y" + + readchar.key.ENTER + ), + expected_code=0, + ) + + # Read the updated prefect.yaml + prefect_file = Path("prefect.yaml") + with prefect_file.open(mode="r") as f: + contents = yaml.safe_load(f) + + assert "deployments" in contents + assert "sla" in contents["deployments"][-1] + assert contents["deployments"][-1]["sla"] == [cli_sla_spec] From 12946ffc076db0c1cb402fa9000a74f85e4a6d65 Mon Sep 17 00:00:00 2001 From: Dylan Hughes Date: Thu, 2 Jan 2025 12:43:28 -0500 Subject: [PATCH 4/7] remove old test --- tests/experimental/test_sla.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/tests/experimental/test_sla.py b/tests/experimental/test_sla.py index ea1885055d36..50779009a4f6 100644 --- a/tests/experimental/test_sla.py +++ b/tests/experimental/test_sla.py @@ -110,16 +110,6 @@ async def test_create_sla(self): sla.set_deployment_id(deployment_id) assert sla.owner_resource == f"prefect.deployment.{deployment_id}" - async def test_model_dump_fails_if_deployment_id_is_not_set(self): - sla = ServiceLevelAgreement( - name="test-sla", - ) - with pytest.raises( - ValueError, - match="Deployment ID is not set. Please set using `set_deployment_id`.", - ): - sla.model_dump() - class TestClientCreateSla: async def test_create_sla_against_cloud(self): From 62039fba7d979ec7c2fa5ed19a234423e5d92419 Mon Sep 17 00:00:00 2001 From: Dylan Hughes Date: Thu, 2 Jan 2025 12:46:20 -0500 Subject: [PATCH 5/7] update typing for owner_resource --- src/prefect/_experimental/sla.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/prefect/_experimental/sla.py b/src/prefect/_experimental/sla.py index 9d15e1fdb4bc..d8d84e63ca52 100644 --- a/src/prefect/_experimental/sla.py +++ b/src/prefect/_experimental/sla.py @@ -32,7 +32,7 @@ def set_deployment_id(self, deployment_id: UUID): @computed_field @property - def owner_resource(self) -> str | None: + def owner_resource(self) -> Union[str, None]: if self._deployment_id: return f"prefect.deployment.{self._deployment_id}" return None From 9bc67c8645e66248dbcca9aa1ac4a6de9e2b6fd0 Mon Sep 17 00:00:00 2001 From: Dylan Hughes Date: Thu, 2 Jan 2025 13:05:41 -0500 Subject: [PATCH 6/7] update string parsing for python compatibility --- src/prefect/cli/deploy.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/prefect/cli/deploy.py b/src/prefect/cli/deploy.py index 6478be9e1a55..13732ec2c05a 100644 --- a/src/prefect/cli/deploy.py +++ b/src/prefect/cli/deploy.py @@ -1813,10 +1813,10 @@ async def _create_slas( await client.create_sla(sla) except Exception as e: app.console.print( - f"Failed to create SLA: {sla.get("name")}. Error: {str(e)}", + f"""Failed to create SLA: {sla.get("name")}. Error: {str(e)}""", style="red", ) - exceptions.append((f"Failed to create SLA: {sla.get('name')}", e)) + exceptions.append((f"""Failed to create SLA: {sla.get('name')}""", e)) if exceptions: raise ValueError("Failed to create one or more SLAs.", exceptions) else: From 6943dd2ecf8e612fb8078fb91ce907ba2f854035 Mon Sep 17 00:00:00 2001 From: Dylan Hughes Date: Thu, 2 Jan 2025 13:15:29 -0500 Subject: [PATCH 7/7] update types for backwards compatibility --- src/prefect/cli/deploy.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/prefect/cli/deploy.py b/src/prefect/cli/deploy.py index 13732ec2c05a..b580ea991482 100644 --- a/src/prefect/cli/deploy.py +++ b/src/prefect/cli/deploy.py @@ -1758,8 +1758,8 @@ def _handle_deprecated_schedule_fields(deploy_config: Dict): def _gather_deployment_sla_definitions( - sla_flags: List[str] | None, existing_slas: List[Dict[str, Any]] | None -) -> List[Dict[str, Any]] | None: + sla_flags: Union[List[str], None], existing_slas: Union[List[Dict[str, Any]], None] +) -> Union[List[Dict[str, Any]], None]: """Parses SLA flags from CLI and existing deployment config in `prefect.yaml`. Prefers CLI-provided SLAs over config in `prefect.yaml`. """ @@ -1784,7 +1784,7 @@ def _gather_deployment_sla_definitions( def _initialize_deployment_slas( deployment_id: UUID, sla_specs: List[Dict[str, Any]] -) -> List[SlaTypes] | None: +) -> Union[List[SlaTypes], None]: """Initializes SLAs for a deployment. Args: