Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deployment SLA Definitions #16574

Merged
merged 22 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
54e3d07
SLA: First Pass at SLA Schemas and flow.deploy Update (#16455)
dylanbhughes Dec 31, 2024
6cbc709
Merge branch 'main' of github.com:PrefectHQ/prefect into dylanbhughes…
dylanbhughes Jan 2, 2025
5389c71
SLA Support in CLI (#16558)
dylanbhughes Jan 2, 2025
714d94a
Merge branch 'main' into dylanbhughes/deployment-sla
dylanbhughes Jan 2, 2025
aa2163e
trying out new types
dylanbhughes Jan 3, 2025
28bcfc3
more new types
dylanbhughes Jan 3, 2025
6881391
Merge branch 'main' of github.com:PrefectHQ/prefect into dylanbhughes…
dylanbhughes Jan 3, 2025
b86726d
remove old create_sla method
dylanbhughes Jan 3, 2025
151e2ba
remove unused import
dylanbhughes Jan 3, 2025
47a5d79
Merge branch 'main' of github.com:PrefectHQ/prefect into dylanbhughes…
dylanbhughes Jan 3, 2025
b38fd98
add annotations to flow
dylanbhughes Jan 3, 2025
89c7b98
Merge branch 'main' of github.com:PrefectHQ/prefect into dylanbhughes…
dylanbhughes Jan 3, 2025
649216c
remove | None type
dylanbhughes Jan 3, 2025
9ed3ee7
update to avoid pydantic bug
dylanbhughes Jan 3, 2025
65678a3
Merge branch 'main' of github.com:PrefectHQ/prefect into dylanbhughes…
dylanbhughes Jan 3, 2025
ae9a1b8
mark kwarg as experimental
dylanbhughes Jan 3, 2025
9c97344
update docstrings
dylanbhughes Jan 3, 2025
b0163a0
update RunnerDeployment.sla -> ._sla
dylanbhughes Jan 3, 2025
44fc01f
use ExceptionGroup
dylanbhughes Jan 3, 2025
b0292dd
update from sla -> _sla
dylanbhughes Jan 3, 2025
46648f5
Merge branch 'main' into dylanbhughes/deployment-sla
dylanbhughes Jan 3, 2025
c1ec262
remove add_note as it's incompatible with 3.9
dylanbhughes Jan 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
66 changes: 66 additions & 0 deletions src/prefect/_experimental/sla/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from prefect.client.orchestration.base import BaseAsyncClient, BaseClient

if TYPE_CHECKING:
from uuid import UUID

from prefect._experimental.sla.objects import SlaTypes


class SlaClient(BaseClient):
def create_sla(self, sla: "SlaTypes") -> "UUID":
"""
Creates a service level agreement.
Args:
sla: The SLA to create. Must have a deployment ID set.
Raises:
httpx.RequestError: if the SLA was not created for any reason
Returns:
the ID of the SLA in the backend
"""
if not sla.owner_resource:
raise ValueError(
"Deployment ID is not set. Please set using `set_deployment_id`."
)

response = self.request(
"POST",
"/slas/",
json=sla.model_dump(mode="json", exclude_unset=True),
)
response.raise_for_status()

from uuid import UUID

return UUID(response.json().get("id"))


class SlaAsyncClient(BaseAsyncClient):
async def create_sla(self, sla: "SlaTypes") -> "UUID":
"""
Creates a service level agreement.
Args:
sla: The SLA to create. Must have a deployment ID set.
Raises:
httpx.RequestError: if the SLA was not created for any reason
Returns:
the ID of the SLA in the backend
"""
if not sla.owner_resource:
raise ValueError(
"Deployment ID is not set. Please set using `set_deployment_id`."
)

response = await self.request(
"POST",
"/slas/",
json=sla.model_dump(mode="json", exclude_unset=True),
)
response.raise_for_status()

from uuid import UUID

return UUID(response.json().get("id"))
53 changes: 53 additions & 0 deletions src/prefect/_experimental/sla/objects.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from __future__ import annotations

import abc
from typing import Literal, Optional, Union
from uuid import UUID

from pydantic import Field, PrivateAttr, computed_field
from typing_extensions import TypeAlias

from prefect._internal.schemas.bases import PrefectBaseModel


class ServiceLevelAgreement(PrefectBaseModel, abc.ABC):
"""An ORM representation of a Service Level Agreement."""

_deployment_id: Optional[UUID] = PrivateAttr(default=None)

name: str = Field(
default=...,
description="The name of the SLA. Names must be unique on a per-deployment basis.",
)
severity: Literal["minor", "low", "moderate", "high", "critical"] = Field(
default="moderate",
description="The severity of the SLA.",
)
enabled: Optional[bool] = Field(
default=True,
description="Whether the SLA is enabled.",
)

def set_deployment_id(self, deployment_id: UUID):
self._deployment_id = deployment_id
return self

@computed_field
@property
def owner_resource(self) -> Union[str, None]:
if self._deployment_id:
return f"prefect.deployment.{self._deployment_id}"
return None


class TimeToCompletionSla(ServiceLevelAgreement):
"""An SLA that triggers when a flow run takes longer than the specified duration."""

duration: int = Field(
default=...,
description="The maximum flow run duration allowed before the SLA is violated, expressed in seconds.",
)


# Concrete SLA types
SlaTypes: TypeAlias = Union[TimeToCompletionSla]
88 changes: 88 additions & 0 deletions src/prefect/cli/deploy.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Module containing implementation for deploying flows."""

from __future__ import annotations

import json
import os
import re
Expand All @@ -19,6 +21,7 @@
from yaml.error import YAMLError

import prefect
from prefect._experimental.sla.objects import SlaTypes
from prefect._internal.compatibility.deprecated import (
generate_deprecation_message,
)
Expand All @@ -40,6 +43,7 @@
exit_with_error,
)
from prefect.cli.root import app, is_interactive
from prefect.client.base import ServerType
from prefect.client.schemas.actions import DeploymentScheduleCreate
from prefect.client.schemas.filters import WorkerFilter
from prefect.client.schemas.objects import ConcurrencyLimitConfig
Expand Down Expand Up @@ -350,6 +354,12 @@ async def deploy(
"--prefect-file",
help="Specify a custom path to a prefect.yaml file",
),
sla: List[str] = typer.Option(
None,
"--sla",
help="Experimental: One or more SLA configurations for the deployment. May be"
" removed or modified at any time. Currently only supported on Prefect Cloud.",
),
):
"""
Create a deployment to deploy a flow from this project.
Expand Down Expand Up @@ -405,6 +415,7 @@ async def deploy(
"triggers": trigger,
"param": param,
"params": params,
"sla": sla,
}
try:
deploy_configs, actions = _load_deploy_configs_and_actions(
Expand Down Expand Up @@ -734,6 +745,14 @@ async def _run_single_deploy(

await _create_deployment_triggers(client, deployment_id, triggers)

if sla_specs := _gather_deployment_sla_definitions(
options.get("sla"), deploy_config.get("sla")
dylanbhughes marked this conversation as resolved.
Show resolved Hide resolved
):
slas = _initialize_deployment_slas(deployment_id, sla_specs)
await _create_slas(client, slas)
else:
slas = []

app.console.print(
Panel(
f"Deployment '{deploy_config['flow_name']}/{deploy_config['name']}'"
Expand Down Expand Up @@ -791,6 +810,7 @@ async def _run_single_deploy(
push_steps=push_steps or None,
pull_steps=pull_steps or None,
triggers=trigger_specs or None,
sla=sla_specs or None,
prefect_file=prefect_file,
)
app.console.print(
Expand Down Expand Up @@ -1737,3 +1757,71 @@ def _handle_deprecated_schedule_fields(deploy_config: Dict):
)

return deploy_config


def _gather_deployment_sla_definitions(
sla_flags: Union[list[str], None], existing_slas: Union[list[dict[str, Any]], None]
) -> Union[list[dict[str, Any]], None]:
"""Parses SLA flags from CLI and existing deployment config in `prefect.yaml`.
Prefers CLI-provided SLAs over config in `prefect.yaml`.
"""
if sla_flags:
sla_specs = []
for s in sla_flags:
try:
if s.endswith(".yaml"):
with open(s, "r") as f:
sla_specs.extend(yaml.safe_load(f).get("sla", []))
elif s.endswith(".json"):
with open(s, "r") as f:
sla_specs.extend(json.load(f).get("sla", []))
else:
sla_specs.append(json.loads(s))
except Exception as e:
raise ValueError(f"Failed to parse SLA: {s}. Error: {str(e)}")
return sla_specs

return existing_slas


def _initialize_deployment_slas(
deployment_id: UUID, sla_specs: list[dict[str, Any]]
) -> list[SlaTypes]:
"""Initializes SLAs for a deployment.

Args:
deployment_id: Deployment ID.
sla_specs: SLA specification dictionaries.

Returns:
List of SLAs.
"""
slas = [pydantic.TypeAdapter(SlaTypes).validate_python(spec) for spec in sla_specs]

for sla in slas:
sla.set_deployment_id(deployment_id)

return slas


async def _create_slas(
client: "PrefectClient",
slas: List[SlaTypes],
):
if client.server_type == ServerType.CLOUD:
exceptions = []
for sla in slas:
try:
await client.create_sla(sla)
except Exception as e:
app.console.print(
f"""Failed to create SLA: {sla.get("name")}. Error: {str(e)}""",
style="red",
)
exceptions.append((f"""Failed to create SLA: {sla.get('name')}""", e))
if exceptions:
raise ValueError("Failed to create one or more SLAs.", exceptions)
else:
raise ValueError(
"SLA configuration is currently only supported on Prefect Cloud."
)
3 changes: 3 additions & 0 deletions src/prefect/client/orchestration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
AutomationClient,
AutomationAsyncClient,
)
from prefect._experimental.sla.client import SlaClient, SlaAsyncClient

from prefect.client.orchestration._flows.client import (
FlowClient,
Expand Down Expand Up @@ -249,6 +250,7 @@ class PrefectClient(
ConcurrencyLimitAsyncClient,
DeploymentAsyncClient,
AutomationAsyncClient,
SlaAsyncClient,
FlowRunAsyncClient,
FlowAsyncClient,
):
Expand Down Expand Up @@ -1863,6 +1865,7 @@ class SyncPrefectClient(
ConcurrencyLimitClient,
DeploymentClient,
AutomationClient,
SlaClient,
FlowRunClient,
FlowClient,
):
Expand Down
6 changes: 6 additions & 0 deletions src/prefect/deployments/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
To get started, follow along with [the deloyments tutorial](/tutorials/deployments/).
"""

from __future__ import annotations

import os
from copy import deepcopy
from pathlib import Path
Expand Down Expand Up @@ -275,6 +277,7 @@ def _save_deployment_to_prefect_file(
push_steps: Optional[List[Dict]] = None,
pull_steps: Optional[List[Dict]] = None,
triggers: Optional[List[Dict]] = None,
sla: Optional[list[dict]] = None,
prefect_file: Path = Path("prefect.yaml"),
):
"""
Expand Down Expand Up @@ -319,6 +322,9 @@ def _save_deployment_to_prefect_file(
if triggers and triggers != parsed_prefect_file_contents.get("triggers"):
deployment["triggers"] = triggers

if sla and sla != parsed_prefect_file_contents.get("sla"):
deployment["sla"] = sla

deployments = parsed_prefect_file_contents.get("deployments")
if deployments is None:
parsed_prefect_file_contents["deployments"] = [deployment]
Expand Down
Loading
Loading