Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SLA Support in CLI #16558

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions src/prefect/_experimental/sla.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,14 @@ class ServiceLevelAgreement(PrefectBaseModel, abc.ABC):

def set_deployment_id(self, deployment_id: UUID):
self._deployment_id = deployment_id
return self

@computed_field
@property
def owner_resource(self) -> str:
if not self._deployment_id:
raise ValueError(
"Deployment ID is not set. Please set using `set_deployment_id`."
)
return f"prefect.deployment.{self._deployment_id}"
def owner_resource(self) -> Union[str, None]:
if self._deployment_id:
return f"prefect.deployment.{self._deployment_id}"
return None


class TimeToCompletionSla(ServiceLevelAgreement):
Expand Down
86 changes: 86 additions & 0 deletions src/prefect/cli/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from yaml.error import YAMLError

import prefect
from prefect._experimental.sla import SlaTypes
from prefect._internal.compatibility.deprecated import (
generate_deprecation_message,
)
Expand All @@ -40,6 +41,7 @@
exit_with_error,
)
from prefect.cli.root import app, is_interactive
from prefect.client.base import ServerType
from prefect.client.schemas.actions import DeploymentScheduleCreate
from prefect.client.schemas.filters import WorkerFilter
from prefect.client.schemas.objects import ConcurrencyLimitConfig
Expand Down Expand Up @@ -350,6 +352,12 @@ async def deploy(
"--prefect-file",
help="Specify a custom path to a prefect.yaml file",
),
sla: List[str] = typer.Option(
None,
"--sla",
help="Experimental: One or more SLA configurations for the deployment. May be"
" removed or modified at any time. Currently only supported on Prefect Cloud.",
),
):
"""
Create a deployment to deploy a flow from this project.
Expand Down Expand Up @@ -405,6 +413,7 @@ async def deploy(
"triggers": trigger,
"param": param,
"params": params,
"sla": sla,
}
try:
deploy_configs, actions = _load_deploy_configs_and_actions(
Expand Down Expand Up @@ -734,6 +743,14 @@ async def _run_single_deploy(

await _create_deployment_triggers(client, deployment_id, triggers)

if sla_specs := _gather_deployment_sla_definitions(
options.get("sla"), deploy_config.get("sla")
):
slas = _initialize_deployment_slas(deployment_id, sla_specs)
await _create_slas(client, slas)
else:
slas = []

app.console.print(
Panel(
f"Deployment '{deploy_config['flow_name']}/{deploy_config['name']}'"
Expand Down Expand Up @@ -791,6 +808,7 @@ async def _run_single_deploy(
push_steps=push_steps or None,
pull_steps=pull_steps or None,
triggers=trigger_specs or None,
sla=sla_specs or None,
prefect_file=prefect_file,
)
app.console.print(
Expand Down Expand Up @@ -1737,3 +1755,71 @@ def _handle_deprecated_schedule_fields(deploy_config: Dict):
)

return deploy_config


def _gather_deployment_sla_definitions(
sla_flags: Union[List[str], None], existing_slas: Union[List[Dict[str, Any]], None]
) -> Union[List[Dict[str, Any]], None]:
"""Parses SLA flags from CLI and existing deployment config in `prefect.yaml`.
Prefers CLI-provided SLAs over config in `prefect.yaml`.
"""
if sla_flags:
sla_specs = []
for s in sla_flags:
try:
if s.endswith(".yaml"):
with open(s, "r") as f:
sla_specs.extend(yaml.safe_load(f).get("sla", []))
elif s.endswith(".json"):
with open(s, "r") as f:
sla_specs.extend(json.load(f).get("sla", []))
else:
sla_specs.append(json.loads(s))
except Exception as e:
raise ValueError(f"Failed to parse SLA: {s}. Error: {str(e)}")
return sla_specs

return existing_slas


def _initialize_deployment_slas(
deployment_id: UUID, sla_specs: List[Dict[str, Any]]
) -> Union[List[SlaTypes], None]:
"""Initializes SLAs for a deployment.

Args:
deployment_id: Deployment ID.
sla_specs: SLA specification dictionaries.

Returns:
List of SLAs.
"""
slas = [pydantic.TypeAdapter(SlaTypes).validate_python(spec) for spec in sla_specs]

for sla in slas:
sla.set_deployment_id(deployment_id)

return slas


async def _create_slas(
client: "PrefectClient",
slas: List[SlaTypes],
):
if client.server_type == ServerType.CLOUD:
exceptions = []
for sla in slas:
try:
await client.create_sla(sla)
except Exception as e:
app.console.print(
f"""Failed to create SLA: {sla.get("name")}. Error: {str(e)}""",
style="red",
)
exceptions.append((f"""Failed to create SLA: {sla.get('name')}""", e))
if exceptions:
raise ValueError("Failed to create one or more SLAs.", exceptions)
else:
raise ValueError(
"SLA configuration is currently only supported on Prefect Cloud."
)
5 changes: 5 additions & 0 deletions src/prefect/client/orchestration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2967,6 +2967,11 @@ async def create_sla(self, sla: SlaTypes) -> UUID:
Returns:
the ID of the SLA in the backend
"""
if not sla.owner_resource:
raise ValueError(
"Deployment ID is not set. Please set using `set_deployment_id`."
)

response = await self._client.post(
"/slas/",
json=sla.model_dump(mode="json", exclude_unset=True),
Expand Down
4 changes: 4 additions & 0 deletions src/prefect/deployments/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ def _save_deployment_to_prefect_file(
push_steps: Optional[List[Dict]] = None,
pull_steps: Optional[List[Dict]] = None,
triggers: Optional[List[Dict]] = None,
sla: Optional[List[Dict]] = None,
prefect_file: Path = Path("prefect.yaml"),
):
"""
Expand Down Expand Up @@ -319,6 +320,9 @@ def _save_deployment_to_prefect_file(
if triggers and triggers != parsed_prefect_file_contents.get("triggers"):
deployment["triggers"] = triggers

if sla and sla != parsed_prefect_file_contents.get("sla"):
deployment["sla"] = sla

deployments = parsed_prefect_file_contents.get("deployments")
if deployments is None:
parsed_prefect_file_contents["deployments"] = [deployment]
Expand Down
Loading
Loading