diff --git a/docs/v3/api-ref/rest-api/server/schema.json b/docs/v3/api-ref/rest-api/server/schema.json index 68127d6588a4..4777f570e1a5 100644 --- a/docs/v3/api-ref/rest-api/server/schema.json +++ b/docs/v3/api-ref/rest-api/server/schema.json @@ -12703,22 +12703,64 @@ "Body_count_deployments_deployments_count_post": { "properties": { "flows": { - "$ref": "#/components/schemas/FlowFilter" + "anyOf": [ + { + "$ref": "#/components/schemas/FlowFilter" + }, + { + "type": "null" + } + ] }, "flow_runs": { - "$ref": "#/components/schemas/FlowRunFilter" + "anyOf": [ + { + "$ref": "#/components/schemas/FlowRunFilter" + }, + { + "type": "null" + } + ] }, "task_runs": { - "$ref": "#/components/schemas/TaskRunFilter" + "anyOf": [ + { + "$ref": "#/components/schemas/TaskRunFilter" + }, + { + "type": "null" + } + ] }, "deployments": { - "$ref": "#/components/schemas/DeploymentFilter" + "anyOf": [ + { + "$ref": "#/components/schemas/DeploymentFilter" + }, + { + "type": "null" + } + ] }, "work_pools": { - "$ref": "#/components/schemas/WorkPoolFilter" + "anyOf": [ + { + "$ref": "#/components/schemas/WorkPoolFilter" + }, + { + "type": "null" + } + ] }, "work_pool_queues": { - "$ref": "#/components/schemas/WorkQueueFilter" + "anyOf": [ + { + "$ref": "#/components/schemas/WorkQueueFilter" + }, + { + "type": "null" + } + ] } }, "type": "object", @@ -13126,22 +13168,64 @@ "default": 1 }, "flows": { - "$ref": "#/components/schemas/FlowFilter" + "anyOf": [ + { + "$ref": "#/components/schemas/FlowFilter" + }, + { + "type": "null" + } + ] }, "flow_runs": { - "$ref": "#/components/schemas/FlowRunFilter" + "anyOf": [ + { + "$ref": "#/components/schemas/FlowRunFilter" + }, + { + "type": "null" + } + ] }, "task_runs": { - "$ref": "#/components/schemas/TaskRunFilter" + "anyOf": [ + { + "$ref": "#/components/schemas/TaskRunFilter" + }, + { + "type": "null" + } + ] }, "deployments": { - "$ref": "#/components/schemas/DeploymentFilter" + "anyOf": [ + { + "$ref": "#/components/schemas/DeploymentFilter" + }, + { + "type": "null" + } + ] }, "work_pools": { - "$ref": "#/components/schemas/WorkPoolFilter" + "anyOf": [ + { + "$ref": "#/components/schemas/WorkPoolFilter" + }, + { + "type": "null" + } + ] }, "work_pool_queues": { - "$ref": "#/components/schemas/WorkQueueFilter" + "anyOf": [ + { + "$ref": "#/components/schemas/WorkQueueFilter" + }, + { + "type": "null" + } + ] }, "sort": { "$ref": "#/components/schemas/DeploymentSort", @@ -13608,22 +13692,64 @@ "default": 0 }, "flows": { - "$ref": "#/components/schemas/FlowFilter" + "anyOf": [ + { + "$ref": "#/components/schemas/FlowFilter" + }, + { + "type": "null" + } + ] }, "flow_runs": { - "$ref": "#/components/schemas/FlowRunFilter" + "anyOf": [ + { + "$ref": "#/components/schemas/FlowRunFilter" + }, + { + "type": "null" + } + ] }, "task_runs": { - "$ref": "#/components/schemas/TaskRunFilter" + "anyOf": [ + { + "$ref": "#/components/schemas/TaskRunFilter" + }, + { + "type": "null" + } + ] }, "deployments": { - "$ref": "#/components/schemas/DeploymentFilter" + "anyOf": [ + { + "$ref": "#/components/schemas/DeploymentFilter" + }, + { + "type": "null" + } + ] }, "work_pools": { - "$ref": "#/components/schemas/WorkPoolFilter" + "anyOf": [ + { + "$ref": "#/components/schemas/WorkPoolFilter" + }, + { + "type": "null" + } + ] }, "work_pool_queues": { - "$ref": "#/components/schemas/WorkQueueFilter" + "anyOf": [ + { + "$ref": "#/components/schemas/WorkQueueFilter" + }, + { + "type": "null" + } + ] }, "sort": { "$ref": "#/components/schemas/DeploymentSort", diff --git a/pyproject.toml b/pyproject.toml index 3d866b122740..29d52b4bd4dd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,7 +39,6 @@ dependencies = [ "pytz>=2021.1,<2026", "readchar>=4.0.0,<5.0.0", "sqlalchemy[asyncio]>=2.0,<3.0.0", - "starlette<0.46.0; python_version == '3.9'", "typer>=0.12.0,!=0.12.2,<0.16.0", # Client dependencies # If you modify this list, make the same modification in client/pyproject.toml diff --git a/src/prefect/server/api/deployments.py b/src/prefect/server/api/deployments.py index 80779ae45810..ca8e9b8f1810 100644 --- a/src/prefect/server/api/deployments.py +++ b/src/prefect/server/api/deployments.py @@ -376,12 +376,12 @@ async def read_deployment( async def read_deployments( limit: int = dependencies.LimitBody(), offset: int = Body(0, ge=0), - flows: schemas.filters.FlowFilter = None, - flow_runs: schemas.filters.FlowRunFilter = None, - task_runs: schemas.filters.TaskRunFilter = None, - deployments: schemas.filters.DeploymentFilter = None, - work_pools: schemas.filters.WorkPoolFilter = None, - work_pool_queues: schemas.filters.WorkQueueFilter = None, + flows: Optional[schemas.filters.FlowFilter] = None, + flow_runs: Optional[schemas.filters.FlowRunFilter] = None, + task_runs: Optional[schemas.filters.TaskRunFilter] = None, + deployments: Optional[schemas.filters.DeploymentFilter] = None, + work_pools: Optional[schemas.filters.WorkPoolFilter] = None, + work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None, sort: schemas.sorting.DeploymentSort = Body( schemas.sorting.DeploymentSort.NAME_ASC ), @@ -415,12 +415,12 @@ async def read_deployments( async def paginate_deployments( limit: int = dependencies.LimitBody(), page: int = Body(1, ge=1), - flows: schemas.filters.FlowFilter = None, - flow_runs: schemas.filters.FlowRunFilter = None, - task_runs: schemas.filters.TaskRunFilter = None, - deployments: schemas.filters.DeploymentFilter = None, - work_pools: schemas.filters.WorkPoolFilter = None, - work_pool_queues: schemas.filters.WorkQueueFilter = None, + flows: Optional[schemas.filters.FlowFilter] = None, + flow_runs: Optional[schemas.filters.FlowRunFilter] = None, + task_runs: Optional[schemas.filters.TaskRunFilter] = None, + deployments: Optional[schemas.filters.DeploymentFilter] = None, + work_pools: Optional[schemas.filters.WorkPoolFilter] = None, + work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None, sort: schemas.sorting.DeploymentSort = Body( schemas.sorting.DeploymentSort.NAME_ASC ), @@ -474,7 +474,7 @@ async def paginate_deployments( @router.post("/get_scheduled_flow_runs") async def get_scheduled_flow_runs_for_deployments( background_tasks: BackgroundTasks, - deployment_ids: List[UUID] = Body( + deployment_ids: list[UUID] = Body( default=..., description="The deployment IDs to get scheduled runs for" ), scheduled_before: DateTime = Body( @@ -482,7 +482,7 @@ async def get_scheduled_flow_runs_for_deployments( ), limit: int = dependencies.LimitBody(), db: PrefectDBInterface = Depends(provide_database_interface), -) -> List[schemas.responses.FlowRunResponse]: +) -> list[schemas.responses.FlowRunResponse]: """ Get scheduled runs for a set of deployments. Used by a runner to poll for work. """ @@ -515,6 +515,7 @@ async def get_scheduled_flow_runs_for_deployments( background_tasks.add_task( mark_deployments_ready, + db=db, deployment_ids=deployment_ids, ) @@ -523,12 +524,12 @@ async def get_scheduled_flow_runs_for_deployments( @router.post("/count") async def count_deployments( - flows: schemas.filters.FlowFilter = None, - flow_runs: schemas.filters.FlowRunFilter = None, - task_runs: schemas.filters.TaskRunFilter = None, - deployments: schemas.filters.DeploymentFilter = None, - work_pools: schemas.filters.WorkPoolFilter = None, - work_pool_queues: schemas.filters.WorkQueueFilter = None, + flows: Optional[schemas.filters.FlowFilter] = None, + flow_runs: Optional[schemas.filters.FlowRunFilter] = None, + task_runs: Optional[schemas.filters.TaskRunFilter] = None, + deployments: Optional[schemas.filters.DeploymentFilter] = None, + work_pools: Optional[schemas.filters.WorkPoolFilter] = None, + work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None, db: PrefectDBInterface = Depends(provide_database_interface), ) -> int: """ diff --git a/src/prefect/server/api/work_queues.py b/src/prefect/server/api/work_queues.py index 903cb02e53f3..9d1a2af19946 100644 --- a/src/prefect/server/api/work_queues.py +++ b/src/prefect/server/api/work_queues.py @@ -164,6 +164,7 @@ async def read_work_queue_runs( background_tasks.add_task( mark_work_queues_ready, + db=db, polled_work_queue_ids=[work_queue_id], ready_work_queue_ids=( [work_queue_id] if work_queue.status == WorkQueueStatus.NOT_READY else [] @@ -172,6 +173,7 @@ async def read_work_queue_runs( background_tasks.add_task( mark_deployments_ready, + db=db, work_queue_ids=[work_queue_id], ) diff --git a/src/prefect/server/api/workers.py b/src/prefect/server/api/workers.py index ecc397b10193..bc27d2823dda 100644 --- a/src/prefect/server/api/workers.py +++ b/src/prefect/server/api/workers.py @@ -375,6 +375,7 @@ async def get_scheduled_flow_runs( background_tasks.add_task( mark_work_queues_ready, + db=db, polled_work_queue_ids=[ wq.id for wq in work_queues if wq.status != WorkQueueStatus.NOT_READY ], @@ -385,6 +386,7 @@ async def get_scheduled_flow_runs( background_tasks.add_task( mark_deployments_ready, + db=db, work_queue_ids=[wq.id for wq in work_queues], ) diff --git a/src/prefect/server/models/deployments.py b/src/prefect/server/models/deployments.py index c8353f91d285..8dfceb344acd 100644 --- a/src/prefect/server/models/deployments.py +++ b/src/prefect/server/models/deployments.py @@ -6,6 +6,7 @@ from __future__ import annotations import datetime +import logging from collections.abc import Iterable, Sequence from typing import TYPE_CHECKING, Any, Optional, TypeVar, cast from uuid import UUID, uuid4 @@ -16,6 +17,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.sql import Select +from prefect.logging import get_logger from prefect.server import models, schemas from prefect.server.database import PrefectDBInterface, db_injector, orm_models from prefect.server.events.clients import PrefectServerEventsClient @@ -32,6 +34,8 @@ T = TypeVar("T", bound=tuple[Any, ...]) +logger: logging.Logger = get_logger("prefect.server.models.deployments") + @db_injector async def _delete_scheduled_runs( @@ -1052,58 +1056,60 @@ async def delete_deployment_schedule( return result.rowcount > 0 -@db_injector async def mark_deployments_ready( db: PrefectDBInterface, deployment_ids: Optional[Iterable[UUID]] = None, work_queue_ids: Optional[Iterable[UUID]] = None, ) -> None: - deployment_ids = deployment_ids or [] - work_queue_ids = work_queue_ids or [] + try: + deployment_ids = deployment_ids or [] + work_queue_ids = work_queue_ids or [] - if not deployment_ids and not work_queue_ids: - return + if not deployment_ids and not work_queue_ids: + return - async with db.session_context( - begin_transaction=True, - ) as session: - result = await session.execute( - select(db.Deployment.id).where( - sa.or_( - db.Deployment.id.in_(deployment_ids), - db.Deployment.work_queue_id.in_(work_queue_ids), - ), - db.Deployment.status == DeploymentStatus.NOT_READY, + async with db.session_context( + begin_transaction=True, + ) as session: + result = await session.execute( + select(db.Deployment.id).where( + sa.or_( + db.Deployment.id.in_(deployment_ids), + db.Deployment.work_queue_id.in_(work_queue_ids), + ), + db.Deployment.status == DeploymentStatus.NOT_READY, + ) ) - ) - unready_deployments = list(result.scalars().unique().all()) + unready_deployments = list(result.scalars().unique().all()) - last_polled = now("UTC") + last_polled = now("UTC") - await session.execute( - sa.update(db.Deployment) - .where( - sa.or_( - db.Deployment.id.in_(deployment_ids), - db.Deployment.work_queue_id.in_(work_queue_ids), + await session.execute( + sa.update(db.Deployment) + .where( + sa.or_( + db.Deployment.id.in_(deployment_ids), + db.Deployment.work_queue_id.in_(work_queue_ids), + ) ) + .values(status=DeploymentStatus.READY, last_polled=last_polled) ) - .values(status=DeploymentStatus.READY, last_polled=last_polled) - ) - if not unready_deployments: - return - - async with PrefectServerEventsClient() as events: - for deployment_id in unready_deployments: - await events.emit( - await deployment_status_event( - session=session, - deployment_id=deployment_id, - status=DeploymentStatus.READY, - occurred=last_polled, + if not unready_deployments: + return + + async with PrefectServerEventsClient() as events: + for deployment_id in unready_deployments: + await events.emit( + await deployment_status_event( + session=session, + deployment_id=deployment_id, + status=DeploymentStatus.READY, + occurred=last_polled, + ) ) - ) + except Exception as exc: + logger.error(f"Error marking deployments as ready: {exc}", exc_info=True) @db_injector @@ -1112,50 +1118,53 @@ async def mark_deployments_not_ready( deployment_ids: Optional[Iterable[UUID]] = None, work_queue_ids: Optional[Iterable[UUID]] = None, ) -> None: - deployment_ids = deployment_ids or [] - work_queue_ids = work_queue_ids or [] + try: + deployment_ids = deployment_ids or [] + work_queue_ids = work_queue_ids or [] - if not deployment_ids and not work_queue_ids: - return + if not deployment_ids and not work_queue_ids: + return - async with db.session_context( - begin_transaction=True, - ) as session: - result = await session.execute( - select(db.Deployment.id).where( - sa.or_( - db.Deployment.id.in_(deployment_ids), - db.Deployment.work_queue_id.in_(work_queue_ids), - ), - db.Deployment.status == DeploymentStatus.READY, + async with db.session_context( + begin_transaction=True, + ) as session: + result = await session.execute( + select(db.Deployment.id).where( + sa.or_( + db.Deployment.id.in_(deployment_ids), + db.Deployment.work_queue_id.in_(work_queue_ids), + ), + db.Deployment.status == DeploymentStatus.READY, + ) ) - ) - ready_deployments = list(result.scalars().unique().all()) - - await session.execute( - sa.update(db.Deployment) - .where( - sa.or_( - db.Deployment.id.in_(deployment_ids), - db.Deployment.work_queue_id.in_(work_queue_ids), + ready_deployments = list(result.scalars().unique().all()) + + await session.execute( + sa.update(db.Deployment) + .where( + sa.or_( + db.Deployment.id.in_(deployment_ids), + db.Deployment.work_queue_id.in_(work_queue_ids), + ) ) + .values(status=DeploymentStatus.NOT_READY) ) - .values(status=DeploymentStatus.NOT_READY) - ) - if not ready_deployments: - return - - async with PrefectServerEventsClient() as events: - for deployment_id in ready_deployments: - await events.emit( - await deployment_status_event( - session=session, - deployment_id=deployment_id, - status=DeploymentStatus.NOT_READY, - occurred=now("UTC"), + if not ready_deployments: + return + + async with PrefectServerEventsClient() as events: + for deployment_id in ready_deployments: + await events.emit( + await deployment_status_event( + session=session, + deployment_id=deployment_id, + status=DeploymentStatus.NOT_READY, + occurred=now("UTC"), + ) ) - ) + except Exception as exc: + logger.error(f"Error marking deployments as not ready: {exc}", exc_info=True) async def with_system_labels_for_deployment( diff --git a/src/prefect/server/models/work_queues.py b/src/prefect/server/models/work_queues.py index 8a95e4cf4394..abe91170a285 100644 --- a/src/prefect/server/models/work_queues.py +++ b/src/prefect/server/models/work_queues.py @@ -533,7 +533,6 @@ async def record_work_queue_polls( ) -@db_injector async def mark_work_queues_ready( db: PrefectDBInterface, polled_work_queue_ids: Sequence[UUID], diff --git a/ui-v2/src/api/prefect.ts b/ui-v2/src/api/prefect.ts index 5e1ca5db2e5a..e9725d7b7581 100644 --- a/ui-v2/src/api/prefect.ts +++ b/ui-v2/src/api/prefect.ts @@ -4015,12 +4015,12 @@ export interface components { }; /** Body_count_deployments_deployments_count_post */ Body_count_deployments_deployments_count_post: { - flows?: components["schemas"]["FlowFilter"]; - flow_runs?: components["schemas"]["FlowRunFilter"]; - task_runs?: components["schemas"]["TaskRunFilter"]; - deployments?: components["schemas"]["DeploymentFilter"]; - work_pools?: components["schemas"]["WorkPoolFilter"]; - work_pool_queues?: components["schemas"]["WorkQueueFilter"]; + flows?: components["schemas"]["FlowFilter"] | null; + flow_runs?: components["schemas"]["FlowRunFilter"] | null; + task_runs?: components["schemas"]["TaskRunFilter"] | null; + deployments?: components["schemas"]["DeploymentFilter"] | null; + work_pools?: components["schemas"]["WorkPoolFilter"] | null; + work_pool_queues?: components["schemas"]["WorkQueueFilter"] | null; }; /** Body_count_flow_runs_flow_runs_count_post */ Body_count_flow_runs_flow_runs_count_post: { @@ -4235,12 +4235,12 @@ export interface components { * @default 1 */ page: number; - flows?: components["schemas"]["FlowFilter"]; - flow_runs?: components["schemas"]["FlowRunFilter"]; - task_runs?: components["schemas"]["TaskRunFilter"]; - deployments?: components["schemas"]["DeploymentFilter"]; - work_pools?: components["schemas"]["WorkPoolFilter"]; - work_pool_queues?: components["schemas"]["WorkQueueFilter"]; + flows?: components["schemas"]["FlowFilter"] | null; + flow_runs?: components["schemas"]["FlowRunFilter"] | null; + task_runs?: components["schemas"]["TaskRunFilter"] | null; + deployments?: components["schemas"]["DeploymentFilter"] | null; + work_pools?: components["schemas"]["WorkPoolFilter"] | null; + work_pool_queues?: components["schemas"]["WorkQueueFilter"] | null; /** @default NAME_ASC */ sort: components["schemas"]["DeploymentSort"]; /** @@ -4421,12 +4421,12 @@ export interface components { * @default 0 */ offset: number; - flows?: components["schemas"]["FlowFilter"]; - flow_runs?: components["schemas"]["FlowRunFilter"]; - task_runs?: components["schemas"]["TaskRunFilter"]; - deployments?: components["schemas"]["DeploymentFilter"]; - work_pools?: components["schemas"]["WorkPoolFilter"]; - work_pool_queues?: components["schemas"]["WorkQueueFilter"]; + flows?: components["schemas"]["FlowFilter"] | null; + flow_runs?: components["schemas"]["FlowRunFilter"] | null; + task_runs?: components["schemas"]["TaskRunFilter"] | null; + deployments?: components["schemas"]["DeploymentFilter"] | null; + work_pools?: components["schemas"]["WorkPoolFilter"] | null; + work_pool_queues?: components["schemas"]["WorkQueueFilter"] | null; /** @default NAME_ASC */ sort: components["schemas"]["DeploymentSort"]; /**