Skip to content

Commit

Permalink
Deployment SLA Definitions (#16574)
Browse files Browse the repository at this point in the history
Co-authored-by: Alex Streed <[email protected]>
  • Loading branch information
dylanbhughes and desertaxle authored Jan 3, 2025
1 parent bce7345 commit 0f2530c
Show file tree
Hide file tree
Showing 9 changed files with 1,033 additions and 2 deletions.
Empty file.
66 changes: 66 additions & 0 deletions src/prefect/_experimental/sla/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from prefect.client.orchestration.base import BaseAsyncClient, BaseClient

if TYPE_CHECKING:
from uuid import UUID

from prefect._experimental.sla.objects import SlaTypes


class SlaClient(BaseClient):
def create_sla(self, sla: "SlaTypes") -> "UUID":
"""
Creates a service level agreement.
Args:
sla: The SLA to create. Must have a deployment ID set.
Raises:
httpx.RequestError: if the SLA was not created for any reason
Returns:
the ID of the SLA in the backend
"""
if not sla.owner_resource:
raise ValueError(
"Deployment ID is not set. Please set using `set_deployment_id`."
)

response = self.request(
"POST",
"/slas/",
json=sla.model_dump(mode="json", exclude_unset=True),
)
response.raise_for_status()

from uuid import UUID

return UUID(response.json().get("id"))


class SlaAsyncClient(BaseAsyncClient):
async def create_sla(self, sla: "SlaTypes") -> "UUID":
"""
Creates a service level agreement.
Args:
sla: The SLA to create. Must have a deployment ID set.
Raises:
httpx.RequestError: if the SLA was not created for any reason
Returns:
the ID of the SLA in the backend
"""
if not sla.owner_resource:
raise ValueError(
"Deployment ID is not set. Please set using `set_deployment_id`."
)

response = await self.request(
"POST",
"/slas/",
json=sla.model_dump(mode="json", exclude_unset=True),
)
response.raise_for_status()

from uuid import UUID

return UUID(response.json().get("id"))
53 changes: 53 additions & 0 deletions src/prefect/_experimental/sla/objects.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from __future__ import annotations

import abc
from typing import Literal, Optional, Union
from uuid import UUID

from pydantic import Field, PrivateAttr, computed_field
from typing_extensions import TypeAlias

from prefect._internal.schemas.bases import PrefectBaseModel


class ServiceLevelAgreement(PrefectBaseModel, abc.ABC):
"""An ORM representation of a Service Level Agreement."""

_deployment_id: Optional[UUID] = PrivateAttr(default=None)

name: str = Field(
default=...,
description="The name of the SLA. Names must be unique on a per-deployment basis.",
)
severity: Literal["minor", "low", "moderate", "high", "critical"] = Field(
default="moderate",
description="The severity of the SLA.",
)
enabled: Optional[bool] = Field(
default=True,
description="Whether the SLA is enabled.",
)

def set_deployment_id(self, deployment_id: UUID):
self._deployment_id = deployment_id
return self

@computed_field
@property
def owner_resource(self) -> Union[str, None]:
if self._deployment_id:
return f"prefect.deployment.{self._deployment_id}"
return None


class TimeToCompletionSla(ServiceLevelAgreement):
"""An SLA that triggers when a flow run takes longer than the specified duration."""

duration: int = Field(
default=...,
description="The maximum flow run duration allowed before the SLA is violated, expressed in seconds.",
)


# Concrete SLA types
SlaTypes: TypeAlias = Union[TimeToCompletionSla]
88 changes: 88 additions & 0 deletions src/prefect/cli/deploy.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Module containing implementation for deploying flows."""

from __future__ import annotations

import json
import os
import re
Expand All @@ -19,6 +21,7 @@
from yaml.error import YAMLError

import prefect
from prefect._experimental.sla.objects import SlaTypes
from prefect._internal.compatibility.deprecated import (
generate_deprecation_message,
)
Expand All @@ -40,6 +43,7 @@
exit_with_error,
)
from prefect.cli.root import app, is_interactive
from prefect.client.base import ServerType
from prefect.client.schemas.actions import DeploymentScheduleCreate
from prefect.client.schemas.filters import WorkerFilter
from prefect.client.schemas.objects import ConcurrencyLimitConfig
Expand Down Expand Up @@ -350,6 +354,12 @@ async def deploy(
"--prefect-file",
help="Specify a custom path to a prefect.yaml file",
),
sla: List[str] = typer.Option(
None,
"--sla",
help="Experimental: One or more SLA configurations for the deployment. May be"
" removed or modified at any time. Currently only supported on Prefect Cloud.",
),
):
"""
Create a deployment to deploy a flow from this project.
Expand Down Expand Up @@ -405,6 +415,7 @@ async def deploy(
"triggers": trigger,
"param": param,
"params": params,
"sla": sla,
}
try:
deploy_configs, actions = _load_deploy_configs_and_actions(
Expand Down Expand Up @@ -734,6 +745,14 @@ async def _run_single_deploy(

await _create_deployment_triggers(client, deployment_id, triggers)

if sla_specs := _gather_deployment_sla_definitions(
options.get("sla"), deploy_config.get("sla")
):
slas = _initialize_deployment_slas(deployment_id, sla_specs)
await _create_slas(client, slas)
else:
slas = []

app.console.print(
Panel(
f"Deployment '{deploy_config['flow_name']}/{deploy_config['name']}'"
Expand Down Expand Up @@ -791,6 +810,7 @@ async def _run_single_deploy(
push_steps=push_steps or None,
pull_steps=pull_steps or None,
triggers=trigger_specs or None,
sla=sla_specs or None,
prefect_file=prefect_file,
)
app.console.print(
Expand Down Expand Up @@ -1737,3 +1757,71 @@ def _handle_deprecated_schedule_fields(deploy_config: Dict):
)

return deploy_config


def _gather_deployment_sla_definitions(
sla_flags: Union[list[str], None], existing_slas: Union[list[dict[str, Any]], None]
) -> Union[list[dict[str, Any]], None]:
"""Parses SLA flags from CLI and existing deployment config in `prefect.yaml`.
Prefers CLI-provided SLAs over config in `prefect.yaml`.
"""
if sla_flags:
sla_specs = []
for s in sla_flags:
try:
if s.endswith(".yaml"):
with open(s, "r") as f:
sla_specs.extend(yaml.safe_load(f).get("sla", []))
elif s.endswith(".json"):
with open(s, "r") as f:
sla_specs.extend(json.load(f).get("sla", []))
else:
sla_specs.append(json.loads(s))
except Exception as e:
raise ValueError(f"Failed to parse SLA: {s}. Error: {str(e)}")
return sla_specs

return existing_slas


def _initialize_deployment_slas(
deployment_id: UUID, sla_specs: list[dict[str, Any]]
) -> list[SlaTypes]:
"""Initializes SLAs for a deployment.
Args:
deployment_id: Deployment ID.
sla_specs: SLA specification dictionaries.
Returns:
List of SLAs.
"""
slas = [pydantic.TypeAdapter(SlaTypes).validate_python(spec) for spec in sla_specs]

for sla in slas:
sla.set_deployment_id(deployment_id)

return slas


async def _create_slas(
client: "PrefectClient",
slas: List[SlaTypes],
):
if client.server_type == ServerType.CLOUD:
exceptions = []
for sla in slas:
try:
await client.create_sla(sla)
except Exception as e:
app.console.print(
f"""Failed to create SLA: {sla.get("name")}. Error: {str(e)}""",
style="red",
)
exceptions.append((f"""Failed to create SLA: {sla.get('name')}""", e))
if exceptions:
raise ValueError("Failed to create one or more SLAs.", exceptions)
else:
raise ValueError(
"SLA configuration is currently only supported on Prefect Cloud."
)
3 changes: 3 additions & 0 deletions src/prefect/client/orchestration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
AutomationClient,
AutomationAsyncClient,
)
from prefect._experimental.sla.client import SlaClient, SlaAsyncClient

from prefect.client.orchestration._flows.client import (
FlowClient,
Expand Down Expand Up @@ -264,6 +265,7 @@ class PrefectClient(
ConcurrencyLimitAsyncClient,
DeploymentAsyncClient,
AutomationAsyncClient,
SlaAsyncClient,
FlowRunAsyncClient,
FlowAsyncClient,
BlocksDocumentAsyncClient,
Expand Down Expand Up @@ -1881,6 +1883,7 @@ class SyncPrefectClient(
ConcurrencyLimitClient,
DeploymentClient,
AutomationClient,
SlaClient,
FlowRunClient,
FlowClient,
BlocksDocumentClient,
Expand Down
6 changes: 6 additions & 0 deletions src/prefect/deployments/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
To get started, follow along with [the deloyments tutorial](/tutorials/deployments/).
"""

from __future__ import annotations

import os
from copy import deepcopy
from pathlib import Path
Expand Down Expand Up @@ -275,6 +277,7 @@ def _save_deployment_to_prefect_file(
push_steps: Optional[List[Dict]] = None,
pull_steps: Optional[List[Dict]] = None,
triggers: Optional[List[Dict]] = None,
sla: Optional[list[dict]] = None,
prefect_file: Path = Path("prefect.yaml"),
):
"""
Expand Down Expand Up @@ -319,6 +322,9 @@ def _save_deployment_to_prefect_file(
if triggers and triggers != parsed_prefect_file_contents.get("triggers"):
deployment["triggers"] = triggers

if sla and sla != parsed_prefect_file_contents.get("sla"):
deployment["sla"] = sla

deployments = parsed_prefect_file_contents.get("deployments")
if deployments is None:
parsed_prefect_file_contents["deployments"] = [deployment]
Expand Down
Loading

0 comments on commit 0f2530c

Please sign in to comment.