Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Automations CRUD methods in client #16579

Merged
merged 6 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 6 additions & 125 deletions src/prefect/client/orchestration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@
DeploymentClient,
DeploymentAsyncClient,
)
from prefect.client.orchestration._automations.client import (
AutomationClient,
AutomationAsyncClient,
)

import prefect
import prefect.exceptions
Expand Down Expand Up @@ -247,6 +251,7 @@ class PrefectClient(
VariableAsyncClient,
ConcurrencyLimitAsyncClient,
DeploymentAsyncClient,
AutomationAsyncClient,
):
"""
An asynchronous client for interacting with the [Prefect REST API](/api-ref/rest-api/).
Expand Down Expand Up @@ -2270,131 +2275,6 @@ async def delete_flow_run_input(self, flow_run_id: UUID, key: str) -> None:
response = await self._client.delete(f"/flow_runs/{flow_run_id}/input/{key}")
response.raise_for_status()

async def create_automation(self, automation: AutomationCore) -> UUID:
"""Creates an automation in Prefect Cloud."""
response = await self._client.post(
"/automations/",
json=automation.model_dump(mode="json"),
)

return UUID(response.json()["id"])

async def update_automation(
self, automation_id: UUID, automation: AutomationCore
) -> None:
"""Updates an automation in Prefect Cloud."""
response = await self._client.put(
f"/automations/{automation_id}",
json=automation.model_dump(mode="json", exclude_unset=True),
)
response.raise_for_status

async def read_automations(self) -> list[Automation]:
response = await self._client.post("/automations/filter")
response.raise_for_status()
return pydantic.TypeAdapter(list[Automation]).validate_python(response.json())

async def find_automation(
self, id_or_name: Union[str, UUID]
) -> Optional[Automation]:
if isinstance(id_or_name, str):
name = id_or_name
try:
id = UUID(id_or_name)
except ValueError:
id = None
else:
id = id_or_name
name = str(id)

if id:
try:
automation = await self.read_automation(id)
return automation
except prefect.exceptions.HTTPStatusError as e:
if e.response.status_code == status.HTTP_404_NOT_FOUND:
raise prefect.exceptions.ObjectNotFound(http_exc=e) from e

automations = await self.read_automations()

# Look for it by an exact name
for automation in automations:
if automation.name == name:
return automation

# Look for it by a case-insensitive name
for automation in automations:
if automation.name.lower() == name.lower():
return automation

return None

async def read_automation(
self, automation_id: Union[UUID, str]
) -> Optional[Automation]:
response = await self._client.get(f"/automations/{automation_id}")
if response.status_code == 404:
return None
response.raise_for_status()
return Automation.model_validate(response.json())

async def read_automations_by_name(self, name: str) -> list[Automation]:
"""
Query the Prefect API for an automation by name. Only automations matching the provided name will be returned.

Args:
name: the name of the automation to query

Returns:
a list of Automation model representations of the automations
"""
automation_filter = filters.AutomationFilter(
name=filters.AutomationFilterName(any_=[name])
)

response = await self._client.post(
"/automations/filter",
json={
"sort": sorting.AutomationSort.UPDATED_DESC,
"automations": automation_filter.model_dump(mode="json")
if automation_filter
else None,
},
)

response.raise_for_status()

return pydantic.TypeAdapter(list[Automation]).validate_python(response.json())

async def pause_automation(self, automation_id: UUID) -> None:
response = await self._client.patch(
f"/automations/{automation_id}", json={"enabled": False}
)
response.raise_for_status()

async def resume_automation(self, automation_id: UUID) -> None:
response = await self._client.patch(
f"/automations/{automation_id}", json={"enabled": True}
)
response.raise_for_status()

async def delete_automation(self, automation_id: UUID) -> None:
response = await self._client.delete(f"/automations/{automation_id}")
if response.status_code == 404:
return

response.raise_for_status()

async def read_resource_related_automations(
self, resource_id: str
) -> list[Automation]:
response = await self._client.get(f"/automations/related-to/{resource_id}")
response.raise_for_status()
return pydantic.TypeAdapter(list[Automation]).validate_python(response.json())

async def delete_resource_owned_automations(self, resource_id: str) -> None:
await self._client.delete(f"/automations/owned-by/{resource_id}")

async def api_version(self) -> str:
res = await self._client.get("/admin/version")
return res.json()
Expand Down Expand Up @@ -2510,6 +2390,7 @@ class SyncPrefectClient(
VariableClient,
ConcurrencyLimitClient,
DeploymentClient,
AutomationClient,
):
"""
A synchronous client for interacting with the [Prefect REST API](/api-ref/rest-api/).
Expand Down
Empty file.
Loading
Loading