From 74adb9fe4772f43a402fb978720be79f9951f3ee Mon Sep 17 00:00:00 2001 From: Devin Villarosa <102188207+devinvillarosa@users.noreply.github.com> Date: Thu, 2 Jan 2025 15:45:44 -0800 Subject: [PATCH 1/2] [UI v2] feat: Updates loader for concurrency limit route to include active task runs logic (#16522) --- ...y-limits-create-or-edit-dialog.stories.tsx | 6 +- ...-concurrency-limits-data-table.stories.tsx | 7 +- ...ncurrency-limits-delete-dialog.stories.tsx | 6 +- .../index.ts | 1 + .../index.tsx | 3 - ...urrency-limit-active-task-runs.stories.tsx | 49 ++++++ ...run-concurrency-limit-active-task-runs.tsx | 22 +++ ...-run-concurrency-limit-details.stories.tsx | 2 +- ...k-run-concurrency-limit-header.stories.tsx | 7 +- .../task-run-concurrency-limit-page/index.tsx | 43 ++++- ...k-run-concurrency-limit-tab-navigation.tsx | 9 +- ...-concurrency-limits-data-table.stories.tsx | 7 +- ...ncurrency-limits-delete-dialog.stories.tsx | 6 +- ...concurrency-limit-reset-dialog.stories.tsx | 6 +- .../ui/run-card/run-card.stories.tsx | 10 +- ui-v2/src/components/ui/run-card/run-card.tsx | 10 +- .../hooks/global-concurrency-limits.test.ts | 52 ++---- .../task-run-concurrency-limits.test.tsx | 119 +++++++++++-- .../src/hooks/task-run-concurrency-limits.ts | 163 ++++++++++++++++-- .../utils => }/mocks/create-fake-flow-run.ts | 0 .../utils => }/mocks/create-fake-flow.ts | 0 .../create-fake-global-concurrency-limit.ts | 0 .../utils => }/mocks/create-fake-state.ts | 0 .../create-fake-take-run-concurrency-limit.ts | 0 .../utils => }/mocks/create-fake-task-run.ts | 0 ui-v2/src/mocks/index.ts | 6 + .../concurrency-limit.$id.tsx | 4 +- ui-v2/src/storybook/utils/index.ts | 7 - 28 files changed, 409 insertions(+), 136 deletions(-) create mode 100644 ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-active-task-runs/index.ts delete mode 100644 ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-active-task-runs/index.tsx create mode 100644 ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-active-task-runs/task-run-concurrency-limit-active-task-runs.stories.tsx create mode 100644 ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-active-task-runs/task-run-concurrency-limit-active-task-runs.tsx rename ui-v2/src/{storybook/utils => }/mocks/create-fake-flow-run.ts (100%) rename ui-v2/src/{storybook/utils => }/mocks/create-fake-flow.ts (100%) rename ui-v2/src/{storybook/utils => }/mocks/create-fake-global-concurrency-limit.ts (100%) rename ui-v2/src/{storybook/utils => }/mocks/create-fake-state.ts (100%) rename ui-v2/src/{storybook/utils => }/mocks/create-fake-take-run-concurrency-limit.ts (100%) rename ui-v2/src/{storybook/utils => }/mocks/create-fake-task-run.ts (100%) diff --git a/ui-v2/src/components/concurrency/global-concurrency-limits/global-concurrency-limits-create-or-edit-dialog/global-concurrency-limits-create-or-edit-dialog.stories.tsx b/ui-v2/src/components/concurrency/global-concurrency-limits/global-concurrency-limits-create-or-edit-dialog/global-concurrency-limits-create-or-edit-dialog.stories.tsx index b1d648b8fbbb..b0cc2f9ee464 100644 --- a/ui-v2/src/components/concurrency/global-concurrency-limits/global-concurrency-limits-create-or-edit-dialog/global-concurrency-limits-create-or-edit-dialog.stories.tsx +++ b/ui-v2/src/components/concurrency/global-concurrency-limits/global-concurrency-limits-create-or-edit-dialog/global-concurrency-limits-create-or-edit-dialog.stories.tsx @@ -1,7 +1,5 @@ -import { - createFakeGlobalConcurrencyLimit, - reactQueryDecorator, -} from "@/storybook/utils"; +import { createFakeGlobalConcurrencyLimit } from "@/mocks"; +import { reactQueryDecorator } from "@/storybook/utils"; import type { Meta, StoryObj } from "@storybook/react"; import { fn } from "@storybook/test"; import { GlobalConcurrencyLimitsCreateOrEditDialog } from "./global-concurrency-limits-create-or-edit-dialog"; diff --git a/ui-v2/src/components/concurrency/global-concurrency-limits/global-concurrency-limits-data-table/global-concurrency-limits-data-table.stories.tsx b/ui-v2/src/components/concurrency/global-concurrency-limits/global-concurrency-limits-data-table/global-concurrency-limits-data-table.stories.tsx index 08b27cf793e2..0f490e7192fb 100644 --- a/ui-v2/src/components/concurrency/global-concurrency-limits/global-concurrency-limits-data-table/global-concurrency-limits-data-table.stories.tsx +++ b/ui-v2/src/components/concurrency/global-concurrency-limits/global-concurrency-limits-data-table/global-concurrency-limits-data-table.stories.tsx @@ -1,8 +1,5 @@ -import { - createFakeGlobalConcurrencyLimit, - reactQueryDecorator, - toastDecorator, -} from "@/storybook/utils"; +import { createFakeGlobalConcurrencyLimit } from "@/mocks"; +import { reactQueryDecorator, toastDecorator } from "@/storybook/utils"; import type { Meta, StoryObj } from "@storybook/react"; import { fn } from "@storybook/test"; import { Table as GlobalConcurrencyLimitsDataTable } from "./global-concurrency-limits-data-table"; diff --git a/ui-v2/src/components/concurrency/global-concurrency-limits/global-concurrency-limits-delete-dialog/global-concurrency-limits-delete-dialog.stories.tsx b/ui-v2/src/components/concurrency/global-concurrency-limits/global-concurrency-limits-delete-dialog/global-concurrency-limits-delete-dialog.stories.tsx index 98ed01714926..9a383286d5ff 100644 --- a/ui-v2/src/components/concurrency/global-concurrency-limits/global-concurrency-limits-delete-dialog/global-concurrency-limits-delete-dialog.stories.tsx +++ b/ui-v2/src/components/concurrency/global-concurrency-limits/global-concurrency-limits-delete-dialog/global-concurrency-limits-delete-dialog.stories.tsx @@ -1,7 +1,5 @@ -import { - createFakeGlobalConcurrencyLimit, - reactQueryDecorator, -} from "@/storybook/utils"; +import { createFakeGlobalConcurrencyLimit } from "@/mocks"; +import { reactQueryDecorator } from "@/storybook/utils"; import type { Meta, StoryObj } from "@storybook/react"; import { fn } from "@storybook/test"; import { GlobalConcurrencyLimitsDeleteDialog } from "./global-concurrency-limits-delete-dialog"; diff --git a/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-active-task-runs/index.ts b/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-active-task-runs/index.ts new file mode 100644 index 000000000000..11ec8836f976 --- /dev/null +++ b/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-active-task-runs/index.ts @@ -0,0 +1 @@ +export { TaskRunConcurrencyLimitActiveTaskRuns } from "./task-run-concurrency-limit-active-task-runs"; diff --git a/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-active-task-runs/index.tsx b/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-active-task-runs/index.tsx deleted file mode 100644 index 20337917f960..000000000000 --- a/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-active-task-runs/index.tsx +++ /dev/null @@ -1,3 +0,0 @@ -export const TaskRunConcurrencyLimitActiveTaskRuns = () => { - return
TODO: Active Task Runs View
; -}; diff --git a/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-active-task-runs/task-run-concurrency-limit-active-task-runs.stories.tsx b/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-active-task-runs/task-run-concurrency-limit-active-task-runs.stories.tsx new file mode 100644 index 000000000000..9382eb623cb0 --- /dev/null +++ b/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-active-task-runs/task-run-concurrency-limit-active-task-runs.stories.tsx @@ -0,0 +1,49 @@ +import { createFakeFlow, createFakeFlowRun, createFakeTaskRun } from "@/mocks"; +import { routerDecorator } from "@/storybook/utils"; +import type { Meta, StoryObj } from "@storybook/react"; + +import { TaskRunConcurrencyLimitActiveTaskRuns } from "./task-run-concurrency-limit-active-task-runs"; + +const MOCK_DATA = [ + { + flow: createFakeFlow(), + flowRun: createFakeFlowRun(), + taskRun: createFakeTaskRun(), + }, + { + flow: createFakeFlow(), + flowRun: createFakeFlowRun(), + taskRun: createFakeTaskRun({ tags: ["foo", "bar", "baz"] }), + }, + { + flow: createFakeFlow(), + flowRun: createFakeFlowRun(), + taskRun: createFakeTaskRun(), + }, + { + taskRun: createFakeTaskRun(), + }, + { + flow: createFakeFlow(), + flowRun: createFakeFlowRun(), + taskRun: createFakeTaskRun(), + }, + { + taskRun: createFakeTaskRun({ tags: ["foo", "bar", "baz"] }), + }, +]; + +const meta: Meta = { + title: + "Components/Concurrency/TaskRunConcurrencyLimits/TaskRunConcurrencyLimitActiveTaskRuns", + component: TaskRunConcurrencyLimitActiveTaskRuns, + decorators: [routerDecorator], + args: { data: MOCK_DATA }, +}; +export default meta; + +type Story = StoryObj; + +export const story: Story = { + name: "TaskRunConcurrencyLimitActiveTaskRuns", +}; diff --git a/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-active-task-runs/task-run-concurrency-limit-active-task-runs.tsx b/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-active-task-runs/task-run-concurrency-limit-active-task-runs.tsx new file mode 100644 index 000000000000..13a8050dc046 --- /dev/null +++ b/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-active-task-runs/task-run-concurrency-limit-active-task-runs.tsx @@ -0,0 +1,22 @@ +import type { components } from "@/api/prefect"; +import { RunCard } from "@/components/ui/run-card"; + +type Props = { + data: Array<{ + taskRun: components["schemas"]["TaskRun"]; + flowRun?: components["schemas"]["FlowRun"] | null; + flow?: components["schemas"]["Flow"] | null; + }>; +}; + +export const TaskRunConcurrencyLimitActiveTaskRuns = ({ data }: Props) => { + return ( + + ); +}; diff --git a/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-details/task-run-concurrency-limit-details.stories.tsx b/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-details/task-run-concurrency-limit-details.stories.tsx index 50ffa97467d8..92543ee5567c 100644 --- a/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-details/task-run-concurrency-limit-details.stories.tsx +++ b/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-details/task-run-concurrency-limit-details.stories.tsx @@ -1,4 +1,4 @@ -import { createFakeTaskRunConcurrencyLimit } from "@/storybook/utils"; +import { createFakeTaskRunConcurrencyLimit } from "@/mocks"; import type { Meta, StoryObj } from "@storybook/react"; import { TaskRunConcurrencyLimitDetails } from "./task-run-concurrency-limit-details"; diff --git a/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-header/task-run-concurrency-limit-header.stories.tsx b/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-header/task-run-concurrency-limit-header.stories.tsx index f67f06081a5d..465309a04514 100644 --- a/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-header/task-run-concurrency-limit-header.stories.tsx +++ b/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-header/task-run-concurrency-limit-header.stories.tsx @@ -1,8 +1,5 @@ -import { - createFakeTaskRunConcurrencyLimit, - routerDecorator, - toastDecorator, -} from "@/storybook/utils"; +import { createFakeTaskRunConcurrencyLimit } from "@/mocks"; +import { routerDecorator, toastDecorator } from "@/storybook/utils"; import type { Meta, StoryObj } from "@storybook/react"; import { fn } from "@storybook/test"; diff --git a/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-page/index.tsx b/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-page/index.tsx index 66c475f0450c..99cbbbfc72b8 100644 --- a/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-page/index.tsx +++ b/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-page/index.tsx @@ -1,9 +1,13 @@ import { TaskRunConcurrencyLimitHeader } from "@/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-header"; -import { useGetTaskRunConcurrencyLimit } from "@/hooks/task-run-concurrency-limits"; +import { buildConcurrenyLimitDetailsActiveRunsQuery } from "@/hooks/task-run-concurrency-limits"; import { useState } from "react"; import { TaskRunConcurrencyLimitActiveTaskRuns } from "@/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-active-task-runs"; import { TaskRunConcurrencyLimitDetails } from "@/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-details"; +import { Card } from "@/components/ui/card"; +import { Skeleton } from "@/components/ui/skeleton"; +import { useSuspenseQuery } from "@tanstack/react-query"; +import { Await } from "@tanstack/react-router"; import { type Dialogs, TaskRunConcurrencyLimitDialog, @@ -16,7 +20,9 @@ type Props = { export const TaskRunConcurrencyLimitPage = ({ id }: Props) => { const [openDialog, setOpenDialog] = useState(null); - const { data } = useGetTaskRunConcurrencyLimit(id); + const { data } = useSuspenseQuery( + buildConcurrenyLimitDetailsActiveRunsQuery(id), + ); const handleOpenDeleteDialog = () => setOpenDialog("delete"); const handleOpenResetDialog = () => setOpenDialog("reset"); @@ -29,23 +35,32 @@ export const TaskRunConcurrencyLimitPage = ({ id }: Props) => { } }; + const { activeTaskRuns, taskRunConcurrencyLimit } = data; + const numActiveTaskRuns = taskRunConcurrencyLimit.active_slots?.length; return ( <>
- } - /> - + + } + > + {(promiseData) => ( + + )} + + +
{ ); }; + +type SkeletonLoadingProps = { length?: number }; +const SkeletonLoading = ({ length = 0 }: SkeletonLoadingProps) => ( +
+ {Array.from({ length }, (_, index) => ( + + + + + ))} +
+); diff --git a/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-page/task-run-concurrency-limit-tab-navigation.tsx b/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-page/task-run-concurrency-limit-tab-navigation.tsx index 35d5b2ec45f9..5cfa90d4f957 100644 --- a/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-page/task-run-concurrency-limit-tab-navigation.tsx +++ b/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-page/task-run-concurrency-limit-tab-navigation.tsx @@ -20,13 +20,12 @@ const TAB_OPTIONS: Record = { } as const; type Props = { - activetaskRunsView: React.ReactNode; + /** Should add ActiveTaskRun component */ + children: React.ReactNode; }; // TODO: Move Tabs for navigation to a generic styled component -export const TaskRunConcurrencyLimitTabNavigation = ({ - activetaskRunsView, -}: Props) => { +export const TaskRunConcurrencyLimitTabNavigation = ({ children }: Props) => { const { tab } = routeApi.useSearch(); const navigate = routeApi.useNavigate(); @@ -48,7 +47,7 @@ export const TaskRunConcurrencyLimitTabNavigation = ({ - {activetaskRunsView} + {children} ); diff --git a/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limits-data-table/task-run-concurrency-limits-data-table.stories.tsx b/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limits-data-table/task-run-concurrency-limits-data-table.stories.tsx index b210b6648154..e3c23a5160e2 100644 --- a/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limits-data-table/task-run-concurrency-limits-data-table.stories.tsx +++ b/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limits-data-table/task-run-concurrency-limits-data-table.stories.tsx @@ -1,8 +1,5 @@ -import { - createFakeTaskRunConcurrencyLimit, - routerDecorator, - toastDecorator, -} from "@/storybook/utils"; +import { createFakeTaskRunConcurrencyLimit } from "@/mocks"; +import { routerDecorator, toastDecorator } from "@/storybook/utils"; import type { Meta, StoryObj } from "@storybook/react"; import { fn } from "@storybook/test"; import { Table as TaskRunConcurrencyLimitsDataTable } from "./task-run-concurrency-limits-data-table"; diff --git a/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limits-delete-dialog/task-run-concurrency-limits-delete-dialog.stories.tsx b/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limits-delete-dialog/task-run-concurrency-limits-delete-dialog.stories.tsx index be9f881c2d73..1b15b12cf447 100644 --- a/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limits-delete-dialog/task-run-concurrency-limits-delete-dialog.stories.tsx +++ b/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limits-delete-dialog/task-run-concurrency-limits-delete-dialog.stories.tsx @@ -1,7 +1,5 @@ -import { - createFakeTaskRunConcurrencyLimit, - reactQueryDecorator, -} from "@/storybook/utils"; +import { createFakeTaskRunConcurrencyLimit } from "@/mocks"; +import { reactQueryDecorator } from "@/storybook/utils"; import type { Meta, StoryObj } from "@storybook/react"; import { fn } from "@storybook/test"; import { TaskRunConcurrencyLimitsDeleteDialog } from "./task-run-concurrency-limits-delete-dialog"; diff --git a/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limits-reset-dialog/task-run-concurrency-limit-reset-dialog.stories.tsx b/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limits-reset-dialog/task-run-concurrency-limit-reset-dialog.stories.tsx index cf6578b3888d..4fbe049046dc 100644 --- a/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limits-reset-dialog/task-run-concurrency-limit-reset-dialog.stories.tsx +++ b/ui-v2/src/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limits-reset-dialog/task-run-concurrency-limit-reset-dialog.stories.tsx @@ -1,7 +1,5 @@ -import { - createFakeTaskRunConcurrencyLimit, - reactQueryDecorator, -} from "@/storybook/utils"; +import { createFakeTaskRunConcurrencyLimit } from "@/mocks"; +import { reactQueryDecorator } from "@/storybook/utils"; import type { Meta, StoryObj } from "@storybook/react"; import { fn } from "@storybook/test"; import { TaskRunConcurrencyLimitsResetDialog } from "./task-run-concurrency-limits-reset-dialog"; diff --git a/ui-v2/src/components/ui/run-card/run-card.stories.tsx b/ui-v2/src/components/ui/run-card/run-card.stories.tsx index 40e861224968..1ea779b7e871 100644 --- a/ui-v2/src/components/ui/run-card/run-card.stories.tsx +++ b/ui-v2/src/components/ui/run-card/run-card.stories.tsx @@ -1,12 +1,6 @@ +import { createFakeFlow, createFakeFlowRun, createFakeTaskRun } from "@/mocks"; +import { routerDecorator } from "@/storybook/utils"; import type { Meta, StoryObj } from "@storybook/react"; - -import { - createFakeFlow, - createFakeFlowRun, - createFakeTaskRun, - routerDecorator, -} from "@/storybook/utils"; - import { RunCard } from "./run-card"; const meta: Meta = { diff --git a/ui-v2/src/components/ui/run-card/run-card.tsx b/ui-v2/src/components/ui/run-card/run-card.tsx index 6775cd7b9cf1..f6042b38c657 100644 --- a/ui-v2/src/components/ui/run-card/run-card.tsx +++ b/ui-v2/src/components/ui/run-card/run-card.tsx @@ -18,8 +18,8 @@ const getValues = ({ flowRun, taskRun, }: { - flowRun: undefined | components["schemas"]["FlowRun"]; - taskRun: undefined | components["schemas"]["TaskRun"]; + flowRun: null | undefined | components["schemas"]["FlowRun"]; + taskRun: null | undefined | components["schemas"]["TaskRun"]; }) => { if (taskRun) { const { state, start_time, tags, estimated_run_time } = taskRun; @@ -34,10 +34,10 @@ const getValues = ({ }; type Props = { - flow?: components["schemas"]["Flow"]; - flowRun?: components["schemas"]["FlowRun"]; + flow?: components["schemas"]["Flow"] | null; + flowRun?: components["schemas"]["FlowRun"] | null; /** If task run is included, uses fields from task run over flow run */ - taskRun?: components["schemas"]["TaskRun"]; + taskRun?: components["schemas"]["TaskRun"] | null; }; export const RunCard = ({ flow, flowRun, taskRun }: Props) => { diff --git a/ui-v2/src/hooks/global-concurrency-limits.test.ts b/ui-v2/src/hooks/global-concurrency-limits.test.ts index 8b98bcc09960..16780f1f00a7 100644 --- a/ui-v2/src/hooks/global-concurrency-limits.test.ts +++ b/ui-v2/src/hooks/global-concurrency-limits.test.ts @@ -4,6 +4,7 @@ import { createWrapper, server } from "@tests/utils"; import { http, HttpResponse } from "msw"; import { describe, expect, it } from "vitest"; +import { createFakeGlobalConcurrencyLimit } from "@/mocks"; import { type GlobalConcurrencyLimit, queryKeyFactory, @@ -14,18 +15,10 @@ import { } from "./global-concurrency-limits"; describe("global concurrency limits hooks", () => { - const seedGlobalConcurrencyLimits = () => [ - { - id: "0", - created: "2021-01-01T00:00:00Z", - updated: "2021-01-01T00:00:00Z", - active: false, - name: "global concurrency limit 0", - limit: 0, - active_slots: 0, - slot_decay_per_second: 0, - }, - ]; + const MOCK_DATA = createFakeGlobalConcurrencyLimit({ + id: "0", + }); + const seedData = () => [MOCK_DATA]; const mockFetchGlobalConcurrencyLimitsAPI = ( globalConcurrencyLimits: Array, @@ -50,7 +43,7 @@ describe("global concurrency limits hooks", () => { */ it("is stores list data into the appropriate list query when using useQuery()", async () => { // ------------ Mock API requests when cache is empty - const mockList = seedGlobalConcurrencyLimits(); + const mockList = seedData(); mockFetchGlobalConcurrencyLimitsAPI(mockList); // ------------ Initialize hooks to test @@ -73,16 +66,11 @@ describe("global concurrency limits hooks", () => { const queryClient = new QueryClient(); // ------------ Mock API requests after queries are invalidated - const mockData = seedGlobalConcurrencyLimits().filter( - (limit) => limit.id !== ID_TO_DELETE, - ); + const mockData = seedData().filter((limit) => limit.id !== ID_TO_DELETE); mockFetchGlobalConcurrencyLimitsAPI(mockData); // ------------ Initialize cache - queryClient.setQueryData( - queryKeyFactory.list(filter), - seedGlobalConcurrencyLimits(), - ); + queryClient.setQueryData(queryKeyFactory.list(filter), seedData()); // ------------ Initialize hooks to test const { result: useListGlobalConcurrencyLimitsResult } = renderHook( @@ -129,23 +117,16 @@ describe("global concurrency limits hooks", () => { }; // ------------ Mock API requests after queries are invalidated - const NEW_LIMIT_DATA = { - ...MOCK_NEW_LIMIT, + const NEW_LIMIT_DATA = createFakeGlobalConcurrencyLimit({ id: MOCK_NEW_LIMIT_ID, - created: "2021-01-01T00:00:00Z", - updated: "2021-01-01T00:00:00Z", - active_slots: 0, - slot_decay_per_second: 0, - }; + ...MOCK_NEW_LIMIT, + }); - const mockData = [...seedGlobalConcurrencyLimits(), NEW_LIMIT_DATA]; + const mockData = [...seedData(), NEW_LIMIT_DATA]; mockFetchGlobalConcurrencyLimitsAPI(mockData); // ------------ Initialize cache - queryClient.setQueryData( - queryKeyFactory.list(filter), - seedGlobalConcurrencyLimits(), - ); + queryClient.setQueryData(queryKeyFactory.list(filter), seedData()); // ------------ Initialize hooks to test const { result: useListGlobalConcurrencyLimitsResult } = renderHook( @@ -201,17 +182,14 @@ describe("global concurrency limits hooks", () => { }; // ------------ Mock API requests after queries are invalidated - const mockData = seedGlobalConcurrencyLimits().map((limit) => + const mockData = seedData().map((limit) => limit.id === MOCK_UPDATE_LIMIT_ID ? UPDATED_LIMIT : limit, ); mockFetchGlobalConcurrencyLimitsAPI(mockData); // ------------ Initialize cache - queryClient.setQueryData( - queryKeyFactory.list(filter), - seedGlobalConcurrencyLimits(), - ); + queryClient.setQueryData(queryKeyFactory.list(filter), seedData()); // ------------ Initialize hooks to test const { result: useListGlobalConcurrencyLimitsResult } = renderHook( diff --git a/ui-v2/src/hooks/task-run-concurrency-limits.test.tsx b/ui-v2/src/hooks/task-run-concurrency-limits.test.tsx index f0aa58522b8d..bf12fc2d24f7 100644 --- a/ui-v2/src/hooks/task-run-concurrency-limits.test.tsx +++ b/ui-v2/src/hooks/task-run-concurrency-limits.test.tsx @@ -1,10 +1,18 @@ -import { QueryClient } from "@tanstack/react-query"; +import { components } from "@/api/prefect"; +import { + createFakeFlow, + createFakeFlowRun, + createFakeTaskRun, + createFakeTaskRunConcurrencyLimit, +} from "@/mocks"; +import { QueryClient, useSuspenseQuery } from "@tanstack/react-query"; import { act, renderHook, waitFor } from "@testing-library/react"; import { createWrapper, server } from "@tests/utils"; import { http, HttpResponse } from "msw"; import { describe, expect, it } from "vitest"; import { type TaskRunConcurrencyLimit, + buildConcurrenyLimitDetailsActiveRunsQuery, queryKeyFactory, useCreateTaskRunConcurrencyLimit, useDeleteTaskRunConcurrencyLimit, @@ -14,16 +22,11 @@ import { } from "./task-run-concurrency-limits"; describe("task run concurrency limits hooks", () => { - const seedData = () => [ - { - id: "0", - created: "2021-01-01T00:00:00Z", - updated: "2021-01-01T00:00:00Z", - tag: "my tag 0", - concurrency_limit: 1, - active_slots: [] as Array, - }, - ]; + const MOCK_TASK_RUN_CONCURRENCY_LIMIT = createFakeTaskRunConcurrencyLimit({ + id: "0", + tag: "my tag 0", + }); + const seedData = () => [MOCK_TASK_RUN_CONCURRENCY_LIMIT]; const mockFetchDetailsAPI = (data: TaskRunConcurrencyLimit) => { server.use( @@ -143,11 +146,9 @@ describe("task run concurrency limits hooks", () => { it("useCreateTaskRunConcurrencyLimit() invalidates cache and fetches updated value", async () => { const queryClient = new QueryClient(); const MOCK_NEW_DATA_ID = "1"; - const MOCK_NEW_DATA = { - tag: "my tag 1", - concurrency_limit: 2, - active_slots: [], - }; + const MOCK_NEW_DATA = createFakeTaskRunConcurrencyLimit({ + id: MOCK_NEW_DATA_ID, + }); // ------------ Mock API requests after queries are invalidated const NEW_LIMIT_DATA = { @@ -246,3 +247,89 @@ describe("task run concurrency limits hooks", () => { expect(limit?.active_slots).toHaveLength(0); }); }); + +describe("buildConcurrenyLimitDetailsActiveRunsQuery()", () => { + const MOCK_DATA = createFakeTaskRunConcurrencyLimit({ + id: "0", + tag: "my tag 0", + concurrency_limit: 1, + active_slots: ["task_0"], + }); + const seedData = () => MOCK_DATA; + const MOCK_TASK_RUNS = [ + createFakeTaskRun({ id: "task_0", flow_run_id: "flow_run_0" }), + ]; + const MOCK_FLOW_RUNS = [ + createFakeFlowRun({ id: "flow_run_0", flow_id: "flow_0" }), + ]; + const MOCK_FLOW = createFakeFlow({ id: "flow_0" }); + + const mockFetchDetailsAPI = (data: TaskRunConcurrencyLimit) => { + server.use( + http.get("http://localhost:4200/api/concurrency_limits/:id", () => { + return HttpResponse.json(data); + }), + ); + }; + + const mockTaskRunsFiltersAPI = ( + data: Array, + ) => { + server.use( + http.post("http://localhost:4200/api/task_runs/filter", () => { + return HttpResponse.json(data); + }), + ); + }; + const mockFlowRunsFiltersAPI = ( + data: Array, + ) => { + server.use( + http.post("http://localhost:4200/api/flow_runs/filter", () => { + return HttpResponse.json(data); + }), + ); + }; + + const mockGetFlowAPI = (data: components["schemas"]["Flow"]) => { + server.use( + http.get("http://localhost:4200/api/flows/:id", () => { + return HttpResponse.json(data); + }), + ); + }; + + /** + * Data Management: + * - Asserts waterfall of APIs will fire to get details on the task run concurrency limit + * - Other APIs will fire to get details on each task run, flow run associated with the task run, and overall flow details + */ + it("will fetch a necessary data for task runs", async () => { + const MOCK_DATA = seedData(); + const MOCK_ID = MOCK_DATA.id; + mockFetchDetailsAPI(MOCK_DATA); + mockTaskRunsFiltersAPI(MOCK_TASK_RUNS); + mockFlowRunsFiltersAPI(MOCK_FLOW_RUNS); + mockGetFlowAPI(MOCK_FLOW); + + const { result } = renderHook( + () => + useSuspenseQuery(buildConcurrenyLimitDetailsActiveRunsQuery(MOCK_ID)), + { wrapper: createWrapper() }, + ); + + // ------------ Assert + await waitFor(() => expect(result.current.isSuccess).toBe(true)); + expect(result.current.data.taskRunConcurrencyLimit).toMatchObject( + MOCK_DATA, + ); + const activeTaskRunsResult = await result.current.data.activeTaskRuns; + expect(activeTaskRunsResult).toEqual([ + { + flow: MOCK_FLOW, + flowRun: MOCK_FLOW_RUNS[0], + taskRun: MOCK_TASK_RUNS[0], + }, + ]); + }); +}); diff --git a/ui-v2/src/hooks/task-run-concurrency-limits.ts b/ui-v2/src/hooks/task-run-concurrency-limits.ts index 6a6d51800bf4..14ef84ebf2a9 100644 --- a/ui-v2/src/hooks/task-run-concurrency-limits.ts +++ b/ui-v2/src/hooks/task-run-concurrency-limits.ts @@ -14,13 +14,16 @@ export type TaskRunConcurrencyLimitsFilter = /** * ``` * 🏗️ Task run concurrency limits queries construction 👷 - * all => ['task-run-concurrency-limits'] // key to match ['task-run-concurrency-limits', ... - * list => ['task-run-concurrency-limits', 'list'] // key to match ['task-run-concurrency-limits', 'list', ... - * ['task-run-concurrency-limits', 'list', { ...filter1 }] - * ['task-run-concurrency-limits', 'list', { ...filter2 }] - * details => ['task-run-concurrency-limits', 'details'] // key to match ['task-run-concurrency-limits', 'details', ... - * ['task-run-concurrency-limits', 'details', id1] - * ['task-run-concurrency-limits', 'details', id2] + * all => ['task-run-concurrency-limits'] // key to match ['task-run-concurrency-limits', ... + * list => ['task-run-concurrency-limits', 'list'] // key to match ['task-run-concurrency-limits', 'list', ... + * ['task-run-concurrency-limits', 'list', { ...filter1 }] + * ['task-run-concurrency-limits', 'list', { ...filter2 }] + * details => ['task-run-concurrency-limits', 'details'] // key to match ['task-run-concurrency-limits', 'details', ... + * ['task-run-concurrency-limits', 'details', id1] + * ['task-run-concurrency-limits', 'details', id2] + * activeTaskRuns => ['task-run-concurrency-limits', 'details', 'active-task-runs'] // key to match ['task-run-concurrency-limits', 'details', 'active-runs', ... + * ['task-run-concurrency-limits', 'details', 'active-task-runs', id1] + * ['task-run-concurrency-limits', 'details', 'active-task-runs', id2] * ``` * */ export const queryKeyFactory = { @@ -30,6 +33,10 @@ export const queryKeyFactory = { [...queryKeyFactory.lists(), filter] as const, details: () => [...queryKeyFactory.all(), "details"] as const, detail: (id: string) => [...queryKeyFactory.details(), id] as const, + activeTaskRuns: () => + [...queryKeyFactory.details(), "active-task-runs"] as const, + activeTaskRun: (id: string) => + [...queryKeyFactory.activeTaskRuns(), id] as const, }; // ----- 🔑 Queries 🗄️ @@ -48,15 +55,21 @@ export const buildListTaskRunConcurrencyLimitsQuery = ( refetchInterval: 30_000, }); +const fetchTaskRunConcurrencyLimit = async (id: string) => { + // GET task-run-concurrency-limit by id + const res = await getQueryService().GET("/concurrency_limits/{id}", { + params: { path: { id } }, + }); + if (!res.data) { + throw new Error("'data' expected"); + } + return res.data; +}; + export const buildDetailTaskRunConcurrencyLimitsQuery = (id: string) => queryOptions({ queryKey: queryKeyFactory.detail(id), - queryFn: async () => { - const res = await getQueryService().GET("/concurrency_limits/{id}", { - params: { path: { id } }, - }); - return res.data as TaskRunConcurrencyLimit; // Expecting data to be truthy; - }, + queryFn: () => fetchTaskRunConcurrencyLimit(id), }); /** @@ -204,3 +217,127 @@ export const useResetTaskRunConcurrencyLimitTag = () => { ...rest, }; }; + +const fetchActiveTaskRunDetails = async (activeSlots: Array) => { + const taskRuns = await getQueryService().POST("/task_runs/filter", { + body: { + task_runs: { + id: { any_: activeSlots }, + operator: "or_", + }, + sort: "NAME_DESC", + offset: 0, + }, + }); + if (!taskRuns.data) { + throw new Error("'data' expected"); + } + const taskRunsWithFlows: Array = []; + const taskRunsOnly: Array = []; + + taskRuns.data.forEach((taskRun) => { + if (taskRun.flow_run_id) { + taskRunsWithFlows.push(taskRun); + } else { + taskRunsOnly.push(taskRun); + } + }); + + const activeTaskRunsWithoutFlows = taskRunsOnly.map((taskRun) => ({ + taskRun, + flowRun: null, + flow: null, + })); + + // Early exit if there's no task with parent flows + if (taskRunsWithFlows.length === 0) { + return activeTaskRunsWithoutFlows; + } + + // Now get parent flow information for tasks with parent flows + const flowRunsIds = taskRunsWithFlows.map( + (taskRun) => taskRun.flow_run_id as string, + ); + + // Get Flow Runs info + const flowRuns = await getQueryService().POST("/flow_runs/filter", { + body: { + flow_runs: { + id: { any_: flowRunsIds }, + operator: "or_", + }, + sort: "NAME_DESC", + offset: 0, + }, + }); + if (!flowRuns.data) { + throw new Error("'data' expected"); + } + const hasSameFlowID = flowRuns.data.every( + (flowRun) => flowRun.flow_id === flowRuns.data[0].flow_id, + ); + if (!hasSameFlowID) { + throw new Error("Flow runs has mismatching 'flow_id'"); + } + const flowID = flowRuns.data[0].flow_id; + + // Get Flow info + const flow = await getQueryService().GET("/flows/{id}", { + params: { path: { id: flowID } }, + }); + + if (!flow.data) { + throw new Error("'data' expected"); + } + + // Normalize data per active slot : + /** + * + * -> active_slot (task_run_id 1) -> flow_run (flow_run_id 1) + * concurrencyLimit -> active_slot (task_run_id 2) -> flow_run (flow_run_id 2) -> flow (flow_id) + * -> active_slot (task_run_id 3) -> flow_run (flow_run_id 3) + * + */ + const activeTaskRunsWithFlows = taskRunsWithFlows.map((taskRunsWithFlow) => { + const flowRun = flowRuns.data.find( + (flowRun) => flowRun.id === taskRunsWithFlow.flow_run_id, + ); + + if (!flowRun) { + throw new Error('"Expected to find flowRun'); + } + + return { + taskRun: taskRunsWithFlow, + flowRun, + flow: flow.data, + }; + }); + + return [...activeTaskRunsWithFlows, ...activeTaskRunsWithoutFlows]; +}; + +/** + * + * @param id + * @returns query options for a task-run concurrency limit with active run details that includes details on task run, flow run, and flow + */ +export const buildConcurrenyLimitDetailsActiveRunsQuery = (id: string) => + queryOptions({ + queryKey: queryKeyFactory.activeTaskRun(id), + queryFn: async () => { + const taskRunConcurrencyLimit = await fetchTaskRunConcurrencyLimit(id); + if (!taskRunConcurrencyLimit.active_slots) { + throw new Error("'active_slots' expected"); + } + + const activeTaskRuns = fetchActiveTaskRunDetails( + taskRunConcurrencyLimit.active_slots, + ); + + return { + taskRunConcurrencyLimit, + activeTaskRuns, // defer to return promise + }; + }, + }); diff --git a/ui-v2/src/storybook/utils/mocks/create-fake-flow-run.ts b/ui-v2/src/mocks/create-fake-flow-run.ts similarity index 100% rename from ui-v2/src/storybook/utils/mocks/create-fake-flow-run.ts rename to ui-v2/src/mocks/create-fake-flow-run.ts diff --git a/ui-v2/src/storybook/utils/mocks/create-fake-flow.ts b/ui-v2/src/mocks/create-fake-flow.ts similarity index 100% rename from ui-v2/src/storybook/utils/mocks/create-fake-flow.ts rename to ui-v2/src/mocks/create-fake-flow.ts diff --git a/ui-v2/src/storybook/utils/mocks/create-fake-global-concurrency-limit.ts b/ui-v2/src/mocks/create-fake-global-concurrency-limit.ts similarity index 100% rename from ui-v2/src/storybook/utils/mocks/create-fake-global-concurrency-limit.ts rename to ui-v2/src/mocks/create-fake-global-concurrency-limit.ts diff --git a/ui-v2/src/storybook/utils/mocks/create-fake-state.ts b/ui-v2/src/mocks/create-fake-state.ts similarity index 100% rename from ui-v2/src/storybook/utils/mocks/create-fake-state.ts rename to ui-v2/src/mocks/create-fake-state.ts diff --git a/ui-v2/src/storybook/utils/mocks/create-fake-take-run-concurrency-limit.ts b/ui-v2/src/mocks/create-fake-take-run-concurrency-limit.ts similarity index 100% rename from ui-v2/src/storybook/utils/mocks/create-fake-take-run-concurrency-limit.ts rename to ui-v2/src/mocks/create-fake-take-run-concurrency-limit.ts diff --git a/ui-v2/src/storybook/utils/mocks/create-fake-task-run.ts b/ui-v2/src/mocks/create-fake-task-run.ts similarity index 100% rename from ui-v2/src/storybook/utils/mocks/create-fake-task-run.ts rename to ui-v2/src/mocks/create-fake-task-run.ts diff --git a/ui-v2/src/mocks/index.ts b/ui-v2/src/mocks/index.ts index fbb74af0f6fe..b1d422cd8a17 100644 --- a/ui-v2/src/mocks/index.ts +++ b/ui-v2/src/mocks/index.ts @@ -1 +1,7 @@ export { createFakeAutomation } from "./create-fake-automation"; +export { createFakeFlow } from "./create-fake-flow"; +export { createFakeFlowRun } from "./create-fake-flow-run"; +export { createFakeTaskRun } from "./create-fake-task-run"; +export { createFakeState } from "./create-fake-state"; +export { createFakeTaskRunConcurrencyLimit } from "./create-fake-take-run-concurrency-limit"; +export { createFakeGlobalConcurrencyLimit } from "./create-fake-global-concurrency-limit"; diff --git a/ui-v2/src/routes/concurrency-limits/concurrency-limit.$id.tsx b/ui-v2/src/routes/concurrency-limits/concurrency-limit.$id.tsx index 3905858c8ea1..7dcc21b3e069 100644 --- a/ui-v2/src/routes/concurrency-limits/concurrency-limit.$id.tsx +++ b/ui-v2/src/routes/concurrency-limits/concurrency-limit.$id.tsx @@ -1,5 +1,5 @@ import { TaskRunConcurrencyLimitPage } from "@/components/concurrency/task-run-concurrency-limits/task-run-concurrency-limit-page"; -import { buildDetailTaskRunConcurrencyLimitsQuery } from "@/hooks/task-run-concurrency-limits"; +import { buildConcurrenyLimitDetailsActiveRunsQuery } from "@/hooks/task-run-concurrency-limits"; import { createFileRoute } from "@tanstack/react-router"; import { zodValidator } from "@tanstack/zod-adapter"; import { z } from "zod"; @@ -22,7 +22,7 @@ export const Route = createFileRoute( wrapInSuspense: true, loader: ({ context, params }) => context.queryClient.ensureQueryData( - buildDetailTaskRunConcurrencyLimitsQuery(params.id), + buildConcurrenyLimitDetailsActiveRunsQuery(params.id), ), }); diff --git a/ui-v2/src/storybook/utils/index.ts b/ui-v2/src/storybook/utils/index.ts index 89700e00880d..47e59e86809f 100644 --- a/ui-v2/src/storybook/utils/index.ts +++ b/ui-v2/src/storybook/utils/index.ts @@ -1,10 +1,3 @@ export { routerDecorator } from "./router-decorator"; export { reactQueryDecorator } from "./react-query-decorator"; export { toastDecorator } from "./toast-decorator"; - -export { createFakeFlow } from "./mocks/create-fake-flow"; -export { createFakeFlowRun } from "./mocks/create-fake-flow-run"; -export { createFakeTaskRun } from "./mocks/create-fake-task-run"; -export { createFakeState } from "./mocks/create-fake-state"; -export { createFakeTaskRunConcurrencyLimit } from "./mocks/create-fake-take-run-concurrency-limit"; -export { createFakeGlobalConcurrencyLimit } from "./mocks/create-fake-global-concurrency-limit"; From 29d7a48a6f494b9a0be7669ca73f6fba3c9169a2 Mon Sep 17 00:00:00 2001 From: Adam Azzam <33043305+aaazzam@users.noreply.github.com> Date: Thu, 2 Jan 2025 18:01:47 -0600 Subject: [PATCH 2/2] Refactor Deployment CRUD methods in client (#16576) --- src/prefect/client/orchestration/__init__.py | 579 +-------- .../orchestration/_deployments/__init__.py | 0 .../orchestration/_deployments/client.py | 1125 +++++++++++++++++ tests/client/test_prefect_client.py | 23 +- 4 files changed, 1133 insertions(+), 594 deletions(-) create mode 100644 src/prefect/client/orchestration/_deployments/__init__.py create mode 100644 src/prefect/client/orchestration/_deployments/client.py diff --git a/src/prefect/client/orchestration/__init__.py b/src/prefect/client/orchestration/__init__.py index 9103d12d5b5d..cf0673d43620 100644 --- a/src/prefect/client/orchestration/__init__.py +++ b/src/prefect/client/orchestration/__init__.py @@ -40,6 +40,11 @@ VariableAsyncClient, ) +from prefect.client.orchestration._deployments.client import ( + DeploymentClient, + DeploymentAsyncClient, +) + import prefect import prefect.exceptions import prefect.settings @@ -52,19 +57,11 @@ BlockSchemaCreate, BlockTypeCreate, BlockTypeUpdate, - DeploymentCreate, - DeploymentFlowRunCreate, - DeploymentScheduleCreate, - DeploymentScheduleUpdate, - DeploymentUpdate, FlowCreate, FlowRunCreate, FlowRunNotificationPolicyCreate, FlowRunNotificationPolicyUpdate, FlowRunUpdate, - LogCreate, - GlobalConcurrencyLimitCreate, - GlobalConcurrencyLimitUpdate, TaskRunCreate, TaskRunUpdate, WorkPoolCreate, @@ -87,9 +84,7 @@ BlockDocument, BlockSchema, BlockType, - ConcurrencyOptions, Constant, - DeploymentSchedule, Flow, FlowRunInput, FlowRunNotificationPolicy, @@ -104,13 +99,9 @@ WorkQueueStatusDetail, ) from prefect.client.schemas.responses import ( - DeploymentResponse, - FlowRunResponse, WorkerFlowRunResponse, ) -from prefect.client.schemas.schedules import SCHEDULE_TYPES from prefect.client.schemas.sorting import ( - DeploymentSort, FlowRunSort, FlowSort, TaskRunSort, @@ -255,6 +246,7 @@ class PrefectClient( LogAsyncClient, VariableAsyncClient, ConcurrencyLimitAsyncClient, + DeploymentAsyncClient, ): """ An asynchronous client for interacting with the [Prefect REST API](/api-ref/rest-api/). @@ -612,78 +604,6 @@ async def read_flow_by_name( response = await self._client.get(f"/flows/name/{flow_name}") return Flow.model_validate(response.json()) - async def create_flow_run_from_deployment( - self, - deployment_id: UUID, - *, - parameters: Optional[dict[str, Any]] = None, - context: Optional[dict[str, Any]] = None, - state: Optional[prefect.states.State[Any]] = None, - name: Optional[str] = None, - tags: Optional[Iterable[str]] = None, - idempotency_key: Optional[str] = None, - parent_task_run_id: Optional[UUID] = None, - work_queue_name: Optional[str] = None, - job_variables: Optional[dict[str, Any]] = None, - labels: Optional[KeyValueLabelsField] = None, - ) -> FlowRun: - """ - Create a flow run for a deployment. - - Args: - deployment_id: The deployment ID to create the flow run from - parameters: Parameter overrides for this flow run. Merged with the - deployment defaults - context: Optional run context data - state: The initial state for the run. If not provided, defaults to - `Scheduled` for now. Should always be a `Scheduled` type. - name: An optional name for the flow run. If not provided, the server will - generate a name. - tags: An optional iterable of tags to apply to the flow run; these tags - are merged with the deployment's tags. - idempotency_key: Optional idempotency key for creation of the flow run. - If the key matches the key of an existing flow run, the existing run will - be returned instead of creating a new one. - parent_task_run_id: if a subflow run is being created, the placeholder task - run identifier in the parent flow - work_queue_name: An optional work queue name to add this run to. If not provided, - will default to the deployment's set work queue. If one is provided that does not - exist, a new work queue will be created within the deployment's work pool. - job_variables: Optional variables that will be supplied to the flow run job. - - Raises: - httpx.RequestError: if the Prefect API does not successfully create a run for any reason - - Returns: - The flow run model - """ - parameters = parameters or {} - context = context or {} - state = state or prefect.states.Scheduled() - tags = tags or [] - - flow_run_create = DeploymentFlowRunCreate( - parameters=parameters, - context=context, - state=state.to_state_create(), - tags=list(tags), - name=name, - idempotency_key=idempotency_key, - parent_task_run_id=parent_task_run_id, - job_variables=job_variables, - labels=labels, - ) - - # done separately to avoid including this field in payloads sent to older API versions - if work_queue_name: - flow_run_create.work_queue_name = work_queue_name - - response = await self._client.post( - f"/deployments/{deployment_id}/create_flow_run", - json=flow_run_create.model_dump(mode="json", exclude_unset=True), - ) - return FlowRun.model_validate(response.json()) - async def create_flow_run( self, flow: "FlowObject[Any, R]", @@ -1466,422 +1386,6 @@ async def read_block_documents_by_type( response.json() ) - async def create_deployment( - self, - flow_id: UUID, - name: str, - version: Optional[str] = None, - schedules: Optional[list[DeploymentScheduleCreate]] = None, - concurrency_limit: Optional[int] = None, - concurrency_options: Optional[ConcurrencyOptions] = None, - parameters: Optional[dict[str, Any]] = None, - description: Optional[str] = None, - work_queue_name: Optional[str] = None, - work_pool_name: Optional[str] = None, - tags: Optional[list[str]] = None, - storage_document_id: Optional[UUID] = None, - path: Optional[str] = None, - entrypoint: Optional[str] = None, - infrastructure_document_id: Optional[UUID] = None, - parameter_openapi_schema: Optional[dict[str, Any]] = None, - paused: Optional[bool] = None, - pull_steps: Optional[list[dict[str, Any]]] = None, - enforce_parameter_schema: Optional[bool] = None, - job_variables: Optional[dict[str, Any]] = None, - ) -> UUID: - """ - Create a deployment. - - Args: - flow_id: the flow ID to create a deployment for - name: the name of the deployment - version: an optional version string for the deployment - tags: an optional list of tags to apply to the deployment - storage_document_id: an reference to the storage block document - used for the deployed flow - infrastructure_document_id: an reference to the infrastructure block document - to use for this deployment - job_variables: A dictionary of dot delimited infrastructure overrides that - will be applied at runtime; for example `env.CONFIG_KEY=config_value` or - `namespace='prefect'`. This argument was previously named `infra_overrides`. - Both arguments are supported for backwards compatibility. - - Raises: - httpx.RequestError: if the deployment was not created for any reason - - Returns: - the ID of the deployment in the backend - """ - - if parameter_openapi_schema is None: - parameter_openapi_schema = {} - deployment_create = DeploymentCreate( - flow_id=flow_id, - name=name, - version=version, - parameters=dict(parameters or {}), - tags=list(tags or []), - work_queue_name=work_queue_name, - description=description, - storage_document_id=storage_document_id, - path=path, - entrypoint=entrypoint, - infrastructure_document_id=infrastructure_document_id, - job_variables=dict(job_variables or {}), - parameter_openapi_schema=parameter_openapi_schema, - paused=paused, - schedules=schedules or [], - concurrency_limit=concurrency_limit, - concurrency_options=concurrency_options, - pull_steps=pull_steps, - enforce_parameter_schema=enforce_parameter_schema, - ) - - if work_pool_name is not None: - deployment_create.work_pool_name = work_pool_name - - # Exclude newer fields that are not set to avoid compatibility issues - exclude = { - field - for field in ["work_pool_name", "work_queue_name"] - if field not in deployment_create.model_fields_set - } - - if deployment_create.paused is None: - exclude.add("paused") - - if deployment_create.pull_steps is None: - exclude.add("pull_steps") - - if deployment_create.enforce_parameter_schema is None: - exclude.add("enforce_parameter_schema") - - json = deployment_create.model_dump(mode="json", exclude=exclude) - response = await self._client.post( - "/deployments/", - json=json, - ) - deployment_id = response.json().get("id") - if not deployment_id: - raise httpx.RequestError(f"Malformed response: {response}") - - return UUID(deployment_id) - - async def set_deployment_paused_state( - self, deployment_id: UUID, paused: bool - ) -> None: - await self._client.patch( - f"/deployments/{deployment_id}", json={"paused": paused} - ) - - async def update_deployment( - self, - deployment_id: UUID, - deployment: DeploymentUpdate, - ) -> None: - await self._client.patch( - f"/deployments/{deployment_id}", - json=deployment.model_dump(mode="json", exclude_unset=True), - ) - - async def _create_deployment_from_schema(self, schema: DeploymentCreate) -> UUID: - """ - Create a deployment from a prepared `DeploymentCreate` schema. - """ - # TODO: We are likely to remove this method once we have considered the - # packaging interface for deployments further. - response = await self._client.post( - "/deployments/", json=schema.model_dump(mode="json") - ) - deployment_id = response.json().get("id") - if not deployment_id: - raise httpx.RequestError(f"Malformed response: {response}") - - return UUID(deployment_id) - - async def read_deployment( - self, - deployment_id: Union[UUID, str], - ) -> DeploymentResponse: - """ - Query the Prefect API for a deployment by id. - - Args: - deployment_id: the deployment ID of interest - - Returns: - a [Deployment model][prefect.client.schemas.objects.Deployment] representation of the deployment - """ - if not isinstance(deployment_id, UUID): - try: - deployment_id = UUID(deployment_id) - except ValueError: - raise ValueError(f"Invalid deployment ID: {deployment_id}") - - try: - response = await self._client.get(f"/deployments/{deployment_id}") - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - return DeploymentResponse.model_validate(response.json()) - - async def read_deployment_by_name( - self, - name: str, - ) -> DeploymentResponse: - """ - Query the Prefect API for a deployment by name. - - Args: - name: A deployed flow's name: / - - Raises: - prefect.exceptions.ObjectNotFound: If request returns 404 - httpx.RequestError: If request fails - - Returns: - a Deployment model representation of the deployment - """ - try: - response = await self._client.get(f"/deployments/name/{name}") - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - from prefect.utilities.text import fuzzy_match_string - - deployments = await self.read_deployments() - flow_name_map = { - flow.id: flow.name - for flow in await asyncio.gather( - *[ - self.read_flow(flow_id) - for flow_id in {d.flow_id for d in deployments} - ] - ) - } - - raise prefect.exceptions.ObjectNotFound( - http_exc=e, - help_message=( - f"Deployment {name!r} not found; did you mean {fuzzy_match!r}?" - if ( - fuzzy_match := fuzzy_match_string( - name, - [ - f"{flow_name_map[d.flow_id]}/{d.name}" - for d in deployments - ], - ) - ) - else f"Deployment {name!r} not found. Try `prefect deployment ls` to find available deployments." - ), - ) from e - else: - raise - - return DeploymentResponse.model_validate(response.json()) - - async def read_deployments( - self, - *, - flow_filter: Optional[FlowFilter] = None, - flow_run_filter: Optional[FlowRunFilter] = None, - task_run_filter: Optional[TaskRunFilter] = None, - deployment_filter: Optional[DeploymentFilter] = None, - work_pool_filter: Optional[WorkPoolFilter] = None, - work_queue_filter: Optional[WorkQueueFilter] = None, - limit: Optional[int] = None, - sort: Optional[DeploymentSort] = None, - offset: int = 0, - ) -> list[DeploymentResponse]: - """ - Query the Prefect API for deployments. Only deployments matching all - the provided criteria will be returned. - - Args: - flow_filter: filter criteria for flows - flow_run_filter: filter criteria for flow runs - task_run_filter: filter criteria for task runs - deployment_filter: filter criteria for deployments - work_pool_filter: filter criteria for work pools - work_queue_filter: filter criteria for work pool queues - limit: a limit for the deployment query - offset: an offset for the deployment query - - Returns: - a list of Deployment model representations - of the deployments - """ - body: dict[str, Any] = { - "flows": flow_filter.model_dump(mode="json") if flow_filter else None, - "flow_runs": ( - flow_run_filter.model_dump(mode="json", exclude_unset=True) - if flow_run_filter - else None - ), - "task_runs": ( - task_run_filter.model_dump(mode="json") if task_run_filter else None - ), - "deployments": ( - deployment_filter.model_dump(mode="json") if deployment_filter else None - ), - "work_pools": ( - work_pool_filter.model_dump(mode="json") if work_pool_filter else None - ), - "work_pool_queues": ( - work_queue_filter.model_dump(mode="json") if work_queue_filter else None - ), - "limit": limit, - "offset": offset, - "sort": sort, - } - - response = await self._client.post("/deployments/filter", json=body) - return pydantic.TypeAdapter(list[DeploymentResponse]).validate_python( - response.json() - ) - - async def delete_deployment( - self, - deployment_id: UUID, - ) -> None: - """ - Delete deployment by id. - - Args: - deployment_id: The deployment id of interest. - Raises: - prefect.exceptions.ObjectNotFound: If request returns 404 - httpx.RequestError: If requests fails - """ - try: - await self._client.delete(f"/deployments/{deployment_id}") - except httpx.HTTPStatusError as e: - if e.response.status_code == 404: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - - async def create_deployment_schedules( - self, - deployment_id: UUID, - schedules: list[tuple[SCHEDULE_TYPES, bool]], - ) -> list[DeploymentSchedule]: - """ - Create deployment schedules. - - Args: - deployment_id: the deployment ID - schedules: a list of tuples containing the schedule to create - and whether or not it should be active. - - Raises: - httpx.RequestError: if the schedules were not created for any reason - - Returns: - the list of schedules created in the backend - """ - deployment_schedule_create = [ - DeploymentScheduleCreate(schedule=schedule[0], active=schedule[1]) - for schedule in schedules - ] - - json = [ - deployment_schedule_create.model_dump(mode="json") - for deployment_schedule_create in deployment_schedule_create - ] - response = await self._client.post( - f"/deployments/{deployment_id}/schedules", json=json - ) - return pydantic.TypeAdapter(list[DeploymentSchedule]).validate_python( - response.json() - ) - - async def read_deployment_schedules( - self, - deployment_id: UUID, - ) -> list[DeploymentSchedule]: - """ - Query the Prefect API for a deployment's schedules. - - Args: - deployment_id: the deployment ID - - Returns: - a list of DeploymentSchedule model representations of the deployment schedules - """ - try: - response = await self._client.get(f"/deployments/{deployment_id}/schedules") - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - return pydantic.TypeAdapter(list[DeploymentSchedule]).validate_python( - response.json() - ) - - async def update_deployment_schedule( - self, - deployment_id: UUID, - schedule_id: UUID, - active: Optional[bool] = None, - schedule: Optional[SCHEDULE_TYPES] = None, - ) -> None: - """ - Update a deployment schedule by ID. - - Args: - deployment_id: the deployment ID - schedule_id: the deployment schedule ID of interest - active: whether or not the schedule should be active - schedule: the cron, rrule, or interval schedule this deployment schedule should use - """ - kwargs: dict[str, Any] = {} - if active is not None: - kwargs["active"] = active - if schedule is not None: - kwargs["schedule"] = schedule - - deployment_schedule_update = DeploymentScheduleUpdate(**kwargs) - json = deployment_schedule_update.model_dump(mode="json", exclude_unset=True) - - try: - await self._client.patch( - f"/deployments/{deployment_id}/schedules/{schedule_id}", json=json - ) - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - - async def delete_deployment_schedule( - self, - deployment_id: UUID, - schedule_id: UUID, - ) -> None: - """ - Delete a deployment schedule. - - Args: - deployment_id: the deployment ID - schedule_id: the ID of the deployment schedule to delete. - - Raises: - httpx.RequestError: if the schedules were not deleted for any reason - """ - try: - await self._client.delete( - f"/deployments/{deployment_id}/schedules/{schedule_id}" - ) - except httpx.HTTPStatusError as e: - if e.response.status_code == 404: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - async def read_flow_run(self, flow_run_id: UUID) -> FlowRun: """ Query the Prefect API for a flow run by id. @@ -2666,27 +2170,6 @@ async def read_work_queues( return pydantic.TypeAdapter(list[WorkQueue]).validate_python(response.json()) - async def get_scheduled_flow_runs_for_deployments( - self, - deployment_ids: list[UUID], - scheduled_before: Optional[datetime.datetime] = None, - limit: Optional[int] = None, - ) -> list[FlowRunResponse]: - body: dict[str, Any] = dict(deployment_ids=[str(id) for id in deployment_ids]) - if scheduled_before: - body["scheduled_before"] = str(scheduled_before) - if limit: - body["limit"] = limit - - response = await self._client.post( - "/deployments/get_scheduled_flow_runs", - json=body, - ) - - return pydantic.TypeAdapter(list[FlowRunResponse]).validate_python( - response.json() - ) - async def get_scheduled_flow_runs_for_work_pool( self, work_pool_name: str, @@ -3026,6 +2509,7 @@ class SyncPrefectClient( LogClient, VariableClient, ConcurrencyLimitClient, + DeploymentClient, ): """ A synchronous client for interacting with the [Prefect REST API](/api-ref/rest-api/). @@ -3760,55 +3244,6 @@ def read_task_run_states(self, task_run_id: UUID) -> list[prefect.states.State]: response.json() ) - def read_deployment( - self, - deployment_id: UUID, - ) -> DeploymentResponse: - """ - Query the Prefect API for a deployment by id. - - Args: - deployment_id: the deployment ID of interest - - Returns: - a [Deployment model][prefect.client.schemas.objects.Deployment] representation of the deployment - """ - try: - response = self._client.get(f"/deployments/{deployment_id}") - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - return DeploymentResponse.model_validate(response.json()) - - def read_deployment_by_name( - self, - name: str, - ) -> DeploymentResponse: - """ - Query the Prefect API for a deployment by name. - - Args: - name: A deployed flow's name: / - - Raises: - prefect.exceptions.ObjectNotFound: If request returns 404 - httpx.RequestError: If request fails - - Returns: - a Deployment model representation of the deployment - """ - try: - response = self._client.get(f"/deployments/name/{name}") - except httpx.HTTPStatusError as e: - if e.response.status_code == status.HTTP_404_NOT_FOUND: - raise prefect.exceptions.ObjectNotFound(http_exc=e) from e - else: - raise - - return DeploymentResponse.model_validate(response.json()) - def update_flow_run_labels( self, flow_run_id: UUID, labels: KeyValueLabelsField ) -> None: diff --git a/src/prefect/client/orchestration/_deployments/__init__.py b/src/prefect/client/orchestration/_deployments/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/src/prefect/client/orchestration/_deployments/client.py b/src/prefect/client/orchestration/_deployments/client.py new file mode 100644 index 000000000000..dbf5b5d31536 --- /dev/null +++ b/src/prefect/client/orchestration/_deployments/client.py @@ -0,0 +1,1125 @@ +from __future__ import annotations + +from collections.abc import Iterable +from typing import TYPE_CHECKING, Any, Union + +from httpx import HTTPStatusError, RequestError + +from prefect.client.orchestration.base import BaseAsyncClient, BaseClient +from prefect.exceptions import ObjectNotFound +from prefect.states import Scheduled + +if TYPE_CHECKING: + import datetime + from uuid import UUID + + from prefect.client.schemas import FlowRun + from prefect.client.schemas.actions import ( + DeploymentCreate, + DeploymentScheduleCreate, + DeploymentUpdate, + ) + from prefect.client.schemas.filters import ( + DeploymentFilter, + FlowFilter, + FlowRunFilter, + TaskRunFilter, + WorkPoolFilter, + WorkQueueFilter, + ) + from prefect.client.schemas.objects import ( + ConcurrencyOptions, + DeploymentSchedule, + ) + from prefect.client.schemas.responses import ( + DeploymentResponse, + FlowRunResponse, + ) + from prefect.client.schemas.schedules import SCHEDULE_TYPES + from prefect.client.schemas.sorting import ( + DeploymentSort, + ) + from prefect.states import State + from prefect.types import KeyValueLabelsField + + +class DeploymentClient(BaseClient): + def create_deployment( + self, + flow_id: "UUID", + name: str, + version: str | None = None, + schedules: list["DeploymentScheduleCreate"] | None = None, + concurrency_limit: int | None = None, + concurrency_options: "ConcurrencyOptions | None" = None, + parameters: dict[str, Any] | None = None, + description: str | None = None, + work_queue_name: str | None = None, + work_pool_name: str | None = None, + tags: list[str] | None = None, + storage_document_id: "UUID | None" = None, + path: str | None = None, + entrypoint: str | None = None, + infrastructure_document_id: "UUID | None" = None, + parameter_openapi_schema: dict[str, Any] | None = None, + paused: bool | None = None, + pull_steps: list[dict[str, Any]] | None = None, + enforce_parameter_schema: bool | None = None, + job_variables: dict[str, Any] | None = None, + ) -> "UUID": + """ + Create a deployment. + + Args: + flow_id: the flow ID to create a deployment for + name: the name of the deployment + version: an optional version string for the deployment + tags: an optional list of tags to apply to the deployment + storage_document_id: an reference to the storage block document + used for the deployed flow + infrastructure_document_id: an reference to the infrastructure block document + to use for this deployment + job_variables: A dictionary of dot delimited infrastructure overrides that + will be applied at runtime; for example `env.CONFIG_KEY=config_value` or + `namespace='prefect'`. This argument was previously named `infra_overrides`. + Both arguments are supported for backwards compatibility. + + Raises: + RequestError: if the deployment was not created for any reason + + Returns: + the ID of the deployment in the backend + """ + from uuid import UUID + + from prefect.client.schemas.actions import DeploymentCreate + + if parameter_openapi_schema is None: + parameter_openapi_schema = {} + deployment_create = DeploymentCreate( + flow_id=flow_id, + name=name, + version=version, + parameters=dict(parameters or {}), + tags=list(tags or []), + work_queue_name=work_queue_name, + description=description, + storage_document_id=storage_document_id, + path=path, + entrypoint=entrypoint, + infrastructure_document_id=infrastructure_document_id, + job_variables=dict(job_variables or {}), + parameter_openapi_schema=parameter_openapi_schema, + paused=paused, + schedules=schedules or [], + concurrency_limit=concurrency_limit, + concurrency_options=concurrency_options, + pull_steps=pull_steps, + enforce_parameter_schema=enforce_parameter_schema, + ) + + if work_pool_name is not None: + deployment_create.work_pool_name = work_pool_name + + # Exclude newer fields that are not set to avoid compatibility issues + exclude = { + field + for field in ["work_pool_name", "work_queue_name"] + if field not in deployment_create.model_fields_set + } + + if deployment_create.paused is None: + exclude.add("paused") + + if deployment_create.pull_steps is None: + exclude.add("pull_steps") + + if deployment_create.enforce_parameter_schema is None: + exclude.add("enforce_parameter_schema") + + json = deployment_create.model_dump(mode="json", exclude=exclude) + response = self.request( + "POST", + "/deployments/", + json=json, + ) + deployment_id = response.json().get("id") + if not deployment_id: + raise RequestError(f"Malformed response: {response}") + + return UUID(deployment_id) + + def set_deployment_paused_state(self, deployment_id: "UUID", paused: bool) -> None: + self.request( + "PATCH", + "/deployments/{id}", + path_params={"id": deployment_id}, + json={"paused": paused}, + ) + + def update_deployment( + self, + deployment_id: "UUID", + deployment: "DeploymentUpdate", + ) -> None: + self.request( + "PATCH", + "/deployments/{id}", + path_params={"id": deployment_id}, + json=deployment.model_dump(mode="json", exclude_unset=True), + ) + + def _create_deployment_from_schema(self, schema: "DeploymentCreate") -> "UUID": + """ + Create a deployment from a prepared `DeploymentCreate` schema. + """ + from uuid import UUID + + # TODO: We are likely to remove this method once we have considered the + # packaging interface for deployments further. + response = self.request( + "POST", "/deployments/", json=schema.model_dump(mode="json") + ) + deployment_id = response.json().get("id") + if not deployment_id: + raise RequestError(f"Malformed response: {response}") + + return UUID(deployment_id) + + def read_deployment( + self, + deployment_id: Union["UUID", str], + ) -> "DeploymentResponse": + """ + Query the Prefect API for a deployment by id. + + Args: + deployment_id: the deployment ID of interest + + Returns: + a [Deployment model][prefect.client.schemas.objects.Deployment] representation of the deployment + """ + from uuid import UUID + + from prefect.client.schemas.responses import DeploymentResponse + + if not isinstance(deployment_id, UUID): + try: + deployment_id = UUID(deployment_id) + except ValueError: + raise ValueError(f"Invalid deployment ID: {deployment_id}") + + try: + response = self.request( + "GET", + "/deployments/{id}", + path_params={"id": deployment_id}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + return DeploymentResponse.model_validate(response.json()) + + def read_deployment_by_name( + self, + name: str, + ) -> "DeploymentResponse": + """ + Query the Prefect API for a deployment by name. + + Args: + name: A deployed flow's name: / + + Raises: + ObjectNotFound: If request returns 404 + RequestError: If request fails + + Returns: + a Deployment model representation of the deployment + """ + from prefect.client.schemas.responses import DeploymentResponse + + try: + flow_name, deployment_name = name.split("/") + response = self.request( + "GET", + "/deployments/name/{flow_name}/{deployment_name}", + path_params={ + "flow_name": flow_name, + "deployment_name": deployment_name, + }, + ) + except (HTTPStatusError, ValueError) as e: + if isinstance(e, HTTPStatusError) and e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + elif isinstance(e, ValueError): + raise ValueError( + f"Invalid deployment name format: {name}. Expected format: /" + ) from e + else: + raise + + return DeploymentResponse.model_validate(response.json()) + + def read_deployments( + self, + *, + flow_filter: "FlowFilter | None" = None, + flow_run_filter: "FlowRunFilter | None" = None, + task_run_filter: "TaskRunFilter | None" = None, + deployment_filter: "DeploymentFilter | None" = None, + work_pool_filter: "WorkPoolFilter | None" = None, + work_queue_filter: "WorkQueueFilter | None" = None, + limit: int | None = None, + sort: "DeploymentSort | None" = None, + offset: int = 0, + ) -> list["DeploymentResponse"]: + """ + Query the Prefect API for deployments. Only deployments matching all + the provided criteria will be returned. + + Args: + flow_filter: filter criteria for flows + flow_run_filter: filter criteria for flow runs + task_run_filter: filter criteria for task runs + deployment_filter: filter criteria for deployments + work_pool_filter: filter criteria for work pools + work_queue_filter: filter criteria for work pool queues + limit: a limit for the deployment query + offset: an offset for the deployment query + + Returns: + a list of Deployment model representations + of the deployments + """ + from prefect.client.schemas.responses import DeploymentResponse + + body: dict[str, Any] = { + "flows": flow_filter.model_dump(mode="json") if flow_filter else None, + "flow_runs": ( + flow_run_filter.model_dump(mode="json", exclude_unset=True) + if flow_run_filter + else None + ), + "task_runs": ( + task_run_filter.model_dump(mode="json") if task_run_filter else None + ), + "deployments": ( + deployment_filter.model_dump(mode="json") if deployment_filter else None + ), + "work_pools": ( + work_pool_filter.model_dump(mode="json") if work_pool_filter else None + ), + "work_pool_queues": ( + work_queue_filter.model_dump(mode="json") if work_queue_filter else None + ), + "limit": limit, + "offset": offset, + "sort": sort, + } + + response = self.request("POST", "/deployments/filter", json=body) + return DeploymentResponse.model_validate_list(response.json()) + + def delete_deployment( + self, + deployment_id: "UUID", + ) -> None: + """ + Delete deployment by id. + + Args: + deployment_id: The deployment id of interest. + Raises: + ObjectNotFound: If request returns 404 + RequestError: If requests fails + """ + try: + self.request( + "DELETE", + "/deployments/{id}", + path_params={"id": deployment_id}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + def create_deployment_schedules( + self, + deployment_id: "UUID", + schedules: list[tuple["SCHEDULE_TYPES", bool]], + ) -> list["DeploymentSchedule"]: + """ + Create deployment schedules. + + Args: + deployment_id: the deployment ID + schedules: a list of tuples containing the schedule to create + and whether or not it should be active. + + Raises: + RequestError: if the schedules were not created for any reason + + Returns: + the list of schedules created in the backend + """ + from prefect.client.schemas.actions import DeploymentScheduleCreate + from prefect.client.schemas.objects import DeploymentSchedule + + deployment_schedule_create = [ + DeploymentScheduleCreate(schedule=schedule[0], active=schedule[1]) + for schedule in schedules + ] + + json = [ + deployment_schedule_create.model_dump(mode="json") + for deployment_schedule_create in deployment_schedule_create + ] + response = self.request( + "POST", + "/deployments/{id}/schedules", + path_params={"id": deployment_id}, + json=json, + ) + return DeploymentSchedule.model_validate_list(response.json()) + + def read_deployment_schedules( + self, + deployment_id: "UUID", + ) -> list["DeploymentSchedule"]: + """ + Query the Prefect API for a deployment's schedules. + + Args: + deployment_id: the deployment ID + + Returns: + a list of DeploymentSchedule model representations of the deployment schedules + """ + from prefect.client.schemas.objects import DeploymentSchedule + + try: + response = self.request( + "GET", + "/deployments/{id}/schedules", + path_params={"id": deployment_id}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + return DeploymentSchedule.model_validate_list(response.json()) + + def update_deployment_schedule( + self, + deployment_id: "UUID", + schedule_id: "UUID", + active: bool | None = None, + schedule: "SCHEDULE_TYPES | None" = None, + ) -> None: + """ + Update a deployment schedule by ID. + + Args: + deployment_id: the deployment ID + schedule_id: the deployment schedule ID of interest + active: whether or not the schedule should be active + schedule: the cron, rrule, or interval schedule this deployment schedule should use + """ + from prefect.client.schemas.actions import DeploymentScheduleUpdate + + kwargs: dict[str, Any] = {} + if active is not None: + kwargs["active"] = active + if schedule is not None: + kwargs["schedule"] = schedule + + deployment_schedule_update = DeploymentScheduleUpdate(**kwargs) + json = deployment_schedule_update.model_dump(mode="json", exclude_unset=True) + + try: + self.request( + "PATCH", + "/deployments/{id}/schedules/{schedule_id}", + path_params={"id": deployment_id, "schedule_id": schedule_id}, + json=json, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + def delete_deployment_schedule( + self, + deployment_id: "UUID", + schedule_id: "UUID", + ) -> None: + """ + Delete a deployment schedule. + + Args: + deployment_id: the deployment ID + schedule_id: the ID of the deployment schedule to delete. + + Raises: + RequestError: if the schedules were not deleted for any reason + """ + try: + self.request( + "DELETE", + "/deployments/{id}/schedules/{schedule_id}", + path_params={"id": deployment_id, "schedule_id": schedule_id}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + def get_scheduled_flow_runs_for_deployments( + self, + deployment_ids: list["UUID"], + scheduled_before: "datetime.datetime | None" = None, + limit: int | None = None, + ) -> list["FlowRunResponse"]: + from prefect.client.schemas.responses import FlowRunResponse + + body: dict[str, Any] = dict(deployment_ids=[str(id) for id in deployment_ids]) + if scheduled_before: + body["scheduled_before"] = str(scheduled_before) + if limit: + body["limit"] = limit + + response = self.request( + "POST", + "/deployments/get_scheduled_flow_runs", + json=body, + ) + + return FlowRunResponse.model_validate_list(response.json()) + + def create_flow_run_from_deployment( + self, + deployment_id: "UUID", + *, + parameters: dict[str, Any] | None = None, + context: dict[str, Any] | None = None, + state: State[Any] | None = None, + name: str | None = None, + tags: Iterable[str] | None = None, + idempotency_key: str | None = None, + parent_task_run_id: "UUID | None" = None, + work_queue_name: str | None = None, + job_variables: dict[str, Any] | None = None, + labels: "KeyValueLabelsField | None" = None, + ) -> "FlowRun": + """ + Create a flow run for a deployment. + + Args: + deployment_id: The deployment ID to create the flow run from + parameters: Parameter overrides for this flow run. Merged with the + deployment defaults + context: Optional run context data + state: The initial state for the run. If not provided, defaults to + `Scheduled` for now. Should always be a `Scheduled` type. + name: An optional name for the flow run. If not provided, the server will + generate a name. + tags: An optional iterable of tags to apply to the flow run; these tags + are merged with the deployment's tags. + idempotency_key: Optional idempotency key for creation of the flow run. + If the key matches the key of an existing flow run, the existing run will + be returned instead of creating a new one. + parent_task_run_id: if a subflow run is being created, the placeholder task + run identifier in the parent flow + work_queue_name: An optional work queue name to add this run to. If not provided, + will default to the deployment's set work queue. If one is provided that does not + exist, a new work queue will be created within the deployment's work pool. + job_variables: Optional variables that will be supplied to the flow run job. + + Raises: + RequestError: if the Prefect API does not successfully create a run for any reason + + Returns: + The flow run model + """ + from prefect.client.schemas.actions import DeploymentFlowRunCreate + from prefect.client.schemas.objects import FlowRun + + parameters = parameters or {} + context = context or {} + state = state or Scheduled() + tags = tags or [] + + flow_run_create = DeploymentFlowRunCreate( + parameters=parameters, + context=context, + state=state.to_state_create(), + tags=list(tags), + name=name, + idempotency_key=idempotency_key, + parent_task_run_id=parent_task_run_id, + job_variables=job_variables, + labels=labels, + ) + + # done separately to avoid including this field in payloads sent to older API versions + if work_queue_name: + flow_run_create.work_queue_name = work_queue_name + + response = self.request( + "POST", + "/deployments/{id}/create_flow_run", + path_params={"id": deployment_id}, + json=flow_run_create.model_dump(mode="json", exclude_unset=True), + ) + return FlowRun.model_validate(response.json()) + + +class DeploymentAsyncClient(BaseAsyncClient): + async def create_deployment( + self, + flow_id: "UUID", + name: str, + version: str | None = None, + schedules: list["DeploymentScheduleCreate"] | None = None, + concurrency_limit: int | None = None, + concurrency_options: "ConcurrencyOptions | None" = None, + parameters: dict[str, Any] | None = None, + description: str | None = None, + work_queue_name: str | None = None, + work_pool_name: str | None = None, + tags: list[str] | None = None, + storage_document_id: "UUID | None" = None, + path: str | None = None, + entrypoint: str | None = None, + infrastructure_document_id: "UUID | None" = None, + parameter_openapi_schema: dict[str, Any] | None = None, + paused: bool | None = None, + pull_steps: list[dict[str, Any]] | None = None, + enforce_parameter_schema: bool | None = None, + job_variables: dict[str, Any] | None = None, + ) -> "UUID": + """ + Create a deployment. + + Args: + flow_id: the flow ID to create a deployment for + name: the name of the deployment + version: an optional version string for the deployment + tags: an optional list of tags to apply to the deployment + storage_document_id: an reference to the storage block document + used for the deployed flow + infrastructure_document_id: an reference to the infrastructure block document + to use for this deployment + job_variables: A dictionary of dot delimited infrastructure overrides that + will be applied at runtime; for example `env.CONFIG_KEY=config_value` or + `namespace='prefect'`. This argument was previously named `infra_overrides`. + Both arguments are supported for backwards compatibility. + + Raises: + RequestError: if the deployment was not created for any reason + + Returns: + the ID of the deployment in the backend + """ + from uuid import UUID + + from prefect.client.schemas.actions import DeploymentCreate + + if parameter_openapi_schema is None: + parameter_openapi_schema = {} + deployment_create = DeploymentCreate( + flow_id=flow_id, + name=name, + version=version, + parameters=dict(parameters or {}), + tags=list(tags or []), + work_queue_name=work_queue_name, + description=description, + storage_document_id=storage_document_id, + path=path, + entrypoint=entrypoint, + infrastructure_document_id=infrastructure_document_id, + job_variables=dict(job_variables or {}), + parameter_openapi_schema=parameter_openapi_schema, + paused=paused, + schedules=schedules or [], + concurrency_limit=concurrency_limit, + concurrency_options=concurrency_options, + pull_steps=pull_steps, + enforce_parameter_schema=enforce_parameter_schema, + ) + + if work_pool_name is not None: + deployment_create.work_pool_name = work_pool_name + + # Exclude newer fields that are not set to avoid compatibility issues + exclude = { + field + for field in ["work_pool_name", "work_queue_name"] + if field not in deployment_create.model_fields_set + } + + if deployment_create.paused is None: + exclude.add("paused") + + if deployment_create.pull_steps is None: + exclude.add("pull_steps") + + if deployment_create.enforce_parameter_schema is None: + exclude.add("enforce_parameter_schema") + + json = deployment_create.model_dump(mode="json", exclude=exclude) + response = await self.request( + "POST", + "/deployments/", + json=json, + ) + deployment_id = response.json().get("id") + if not deployment_id: + raise RequestError(f"Malformed response: {response}") + + return UUID(deployment_id) + + async def set_deployment_paused_state( + self, deployment_id: "UUID", paused: bool + ) -> None: + await self.request( + "PATCH", + "/deployments/{id}", + path_params={"id": deployment_id}, + json={"paused": paused}, + ) + + async def update_deployment( + self, + deployment_id: "UUID", + deployment: "DeploymentUpdate", + ) -> None: + await self.request( + "PATCH", + "/deployments/{id}", + path_params={"id": deployment_id}, + json=deployment.model_dump(mode="json", exclude_unset=True), + ) + + async def _create_deployment_from_schema( + self, schema: "DeploymentCreate" + ) -> "UUID": + """ + Create a deployment from a prepared `DeploymentCreate` schema. + """ + from uuid import UUID + + # TODO: We are likely to remove this method once we have considered the + # packaging interface for deployments further. + response = await self.request( + "POST", "/deployments/", json=schema.model_dump(mode="json") + ) + deployment_id = response.json().get("id") + if not deployment_id: + raise RequestError(f"Malformed response: {response}") + + return UUID(deployment_id) + + async def read_deployment( + self, + deployment_id: Union["UUID", str], + ) -> "DeploymentResponse": + """ + Query the Prefect API for a deployment by id. + + Args: + deployment_id: the deployment ID of interest + + Returns: + a [Deployment model][prefect.client.schemas.objects.Deployment] representation of the deployment + """ + from uuid import UUID + + from prefect.client.schemas.responses import DeploymentResponse + + if not isinstance(deployment_id, UUID): + try: + deployment_id = UUID(deployment_id) + except ValueError: + raise ValueError(f"Invalid deployment ID: {deployment_id}") + + try: + response = await self.request( + "GET", + "/deployments/{id}", + path_params={"id": deployment_id}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + return DeploymentResponse.model_validate(response.json()) + + async def read_deployment_by_name( + self, + name: str, + ) -> "DeploymentResponse": + """ + Query the Prefect API for a deployment by name. + + Args: + name: A deployed flow's name: / + + Raises: + ObjectNotFound: If request returns 404 + RequestError: If request fails + + Returns: + a Deployment model representation of the deployment + """ + from prefect.client.schemas.responses import DeploymentResponse + + try: + flow_name, deployment_name = name.split("/") + response = await self.request( + "GET", + "/deployments/name/{flow_name}/{deployment_name}", + path_params={ + "flow_name": flow_name, + "deployment_name": deployment_name, + }, + ) + except (HTTPStatusError, ValueError) as e: + if isinstance(e, HTTPStatusError) and e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + elif isinstance(e, ValueError): + raise ValueError( + f"Invalid deployment name format: {name}. Expected format: /" + ) from e + else: + raise + + return DeploymentResponse.model_validate(response.json()) + + async def read_deployments( + self, + *, + flow_filter: "FlowFilter | None" = None, + flow_run_filter: "FlowRunFilter | None" = None, + task_run_filter: "TaskRunFilter | None" = None, + deployment_filter: "DeploymentFilter | None" = None, + work_pool_filter: "WorkPoolFilter | None" = None, + work_queue_filter: "WorkQueueFilter | None" = None, + limit: int | None = None, + sort: "DeploymentSort | None" = None, + offset: int = 0, + ) -> list["DeploymentResponse"]: + """ + Query the Prefect API for deployments. Only deployments matching all + the provided criteria will be returned. + + Args: + flow_filter: filter criteria for flows + flow_run_filter: filter criteria for flow runs + task_run_filter: filter criteria for task runs + deployment_filter: filter criteria for deployments + work_pool_filter: filter criteria for work pools + work_queue_filter: filter criteria for work pool queues + limit: a limit for the deployment query + offset: an offset for the deployment query + + Returns: + a list of Deployment model representations + of the deployments + """ + from prefect.client.schemas.responses import DeploymentResponse + + body: dict[str, Any] = { + "flows": flow_filter.model_dump(mode="json") if flow_filter else None, + "flow_runs": ( + flow_run_filter.model_dump(mode="json", exclude_unset=True) + if flow_run_filter + else None + ), + "task_runs": ( + task_run_filter.model_dump(mode="json") if task_run_filter else None + ), + "deployments": ( + deployment_filter.model_dump(mode="json") if deployment_filter else None + ), + "work_pools": ( + work_pool_filter.model_dump(mode="json") if work_pool_filter else None + ), + "work_pool_queues": ( + work_queue_filter.model_dump(mode="json") if work_queue_filter else None + ), + "limit": limit, + "offset": offset, + "sort": sort, + } + + response = await self.request("POST", "/deployments/filter", json=body) + return DeploymentResponse.model_validate_list(response.json()) + + async def delete_deployment( + self, + deployment_id: "UUID", + ) -> None: + """ + Delete deployment by id. + + Args: + deployment_id: The deployment id of interest. + Raises: + ObjectNotFound: If request returns 404 + RequestError: If requests fails + """ + try: + await self.request( + "DELETE", + "/deployments/{id}", + path_params={"id": deployment_id}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + async def create_deployment_schedules( + self, + deployment_id: "UUID", + schedules: list[tuple["SCHEDULE_TYPES", bool]], + ) -> list["DeploymentSchedule"]: + """ + Create deployment schedules. + + Args: + deployment_id: the deployment ID + schedules: a list of tuples containing the schedule to create + and whether or not it should be active. + + Raises: + RequestError: if the schedules were not created for any reason + + Returns: + the list of schedules created in the backend + """ + from prefect.client.schemas.actions import DeploymentScheduleCreate + from prefect.client.schemas.objects import DeploymentSchedule + + deployment_schedule_create = [ + DeploymentScheduleCreate(schedule=schedule[0], active=schedule[1]) + for schedule in schedules + ] + + json = [ + deployment_schedule_create.model_dump(mode="json") + for deployment_schedule_create in deployment_schedule_create + ] + response = await self.request( + "POST", + "/deployments/{id}/schedules", + path_params={"id": deployment_id}, + json=json, + ) + return DeploymentSchedule.model_validate_list(response.json()) + + async def read_deployment_schedules( + self, + deployment_id: "UUID", + ) -> list["DeploymentSchedule"]: + """ + Query the Prefect API for a deployment's schedules. + + Args: + deployment_id: the deployment ID + + Returns: + a list of DeploymentSchedule model representations of the deployment schedules + """ + from prefect.client.schemas.objects import DeploymentSchedule + + try: + response = await self.request( + "GET", + "/deployments/{id}/schedules", + path_params={"id": deployment_id}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + return DeploymentSchedule.model_validate_list(response.json()) + + async def update_deployment_schedule( + self, + deployment_id: "UUID", + schedule_id: "UUID", + active: bool | None = None, + schedule: "SCHEDULE_TYPES | None" = None, + ) -> None: + """ + Update a deployment schedule by ID. + + Args: + deployment_id: the deployment ID + schedule_id: the deployment schedule ID of interest + active: whether or not the schedule should be active + schedule: the cron, rrule, or interval schedule this deployment schedule should use + """ + from prefect.client.schemas.actions import DeploymentScheduleUpdate + + kwargs: dict[str, Any] = {} + if active is not None: + kwargs["active"] = active + if schedule is not None: + kwargs["schedule"] = schedule + + deployment_schedule_update = DeploymentScheduleUpdate(**kwargs) + json = deployment_schedule_update.model_dump(mode="json", exclude_unset=True) + + try: + await self.request( + "PATCH", + "/deployments/{id}/schedules/{schedule_id}", + path_params={"id": deployment_id, "schedule_id": schedule_id}, + json=json, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + async def delete_deployment_schedule( + self, + deployment_id: "UUID", + schedule_id: "UUID", + ) -> None: + """ + Delete a deployment schedule. + + Args: + deployment_id: the deployment ID + schedule_id: the ID of the deployment schedule to delete. + + Raises: + RequestError: if the schedules were not deleted for any reason + """ + try: + await self.request( + "DELETE", + "/deployments/{id}/schedules/{schedule_id}", + path_params={"id": deployment_id, "schedule_id": schedule_id}, + ) + except HTTPStatusError as e: + if e.response.status_code == 404: + raise ObjectNotFound(http_exc=e) from e + else: + raise + + async def get_scheduled_flow_runs_for_deployments( + self, + deployment_ids: list["UUID"], + scheduled_before: "datetime.datetime | None" = None, + limit: int | None = None, + ) -> list["FlowRunResponse"]: + from prefect.client.schemas.responses import FlowRunResponse + + body: dict[str, Any] = dict(deployment_ids=[str(id) for id in deployment_ids]) + if scheduled_before: + body["scheduled_before"] = str(scheduled_before) + if limit: + body["limit"] = limit + + response = await self.request( + "POST", + "/deployments/get_scheduled_flow_runs", + json=body, + ) + + return FlowRunResponse.model_validate_list(response.json()) + + async def create_flow_run_from_deployment( + self, + deployment_id: "UUID", + *, + parameters: dict[str, Any] | None = None, + context: dict[str, Any] | None = None, + state: State[Any] | None = None, + name: str | None = None, + tags: Iterable[str] | None = None, + idempotency_key: str | None = None, + parent_task_run_id: "UUID | None" = None, + work_queue_name: str | None = None, + job_variables: dict[str, Any] | None = None, + labels: "KeyValueLabelsField | None" = None, + ) -> "FlowRun": + """ + Create a flow run for a deployment. + + Args: + deployment_id: The deployment ID to create the flow run from + parameters: Parameter overrides for this flow run. Merged with the + deployment defaults + context: Optional run context data + state: The initial state for the run. If not provided, defaults to + `Scheduled` for now. Should always be a `Scheduled` type. + name: An optional name for the flow run. If not provided, the server will + generate a name. + tags: An optional iterable of tags to apply to the flow run; these tags + are merged with the deployment's tags. + idempotency_key: Optional idempotency key for creation of the flow run. + If the key matches the key of an existing flow run, the existing run will + be returned instead of creating a new one. + parent_task_run_id: if a subflow run is being created, the placeholder task + run identifier in the parent flow + work_queue_name: An optional work queue name to add this run to. If not provided, + will default to the deployment's set work queue. If one is provided that does not + exist, a new work queue will be created within the deployment's work pool. + job_variables: Optional variables that will be supplied to the flow run job. + + Raises: + RequestError: if the Prefect API does not successfully create a run for any reason + + Returns: + The flow run model + """ + from prefect.client.schemas.actions import DeploymentFlowRunCreate + from prefect.client.schemas.objects import FlowRun + + parameters = parameters or {} + context = context or {} + state = state or Scheduled() + tags = tags or [] + + flow_run_create = DeploymentFlowRunCreate( + parameters=parameters, + context=context, + state=state.to_state_create(), + tags=list(tags), + name=name, + idempotency_key=idempotency_key, + parent_task_run_id=parent_task_run_id, + job_variables=job_variables, + labels=labels, + ) + + # done separately to avoid including this field in payloads sent to older API versions + if work_queue_name: + flow_run_create.work_queue_name = work_queue_name + + response = await self.request( + "POST", + "/deployments/{id}/create_flow_run", + path_params={"id": deployment_id}, + json=flow_run_create.model_dump(mode="json", exclude_unset=True), + ) + return FlowRun.model_validate(response.json()) diff --git a/tests/client/test_prefect_client.py b/tests/client/test_prefect_client.py index fb3a0dd056b5..438371ea2f3d 100644 --- a/tests/client/test_prefect_client.py +++ b/tests/client/test_prefect_client.py @@ -841,27 +841,6 @@ def moo_deng(): assert len(deployment_responses) == 0 -async def test_read_deployment_by_name_fails_with_helpful_suggestion(prefect_client): - """this is a regression test for https://github.com/PrefectHQ/prefect/issues/15571""" - - @flow - def moo_deng(): - pass - - flow_id = await prefect_client.create_flow(moo_deng) - - await prefect_client.create_deployment( - flow_id=flow_id, - name="moisturized-deployment", - ) - - with pytest.raises( - prefect.exceptions.ObjectNotFound, - match="Deployment 'moo_deng/moisturized-deployment' not found; did you mean 'moo-deng/moisturized-deployment'?", - ): - await prefect_client.read_deployment_by_name("moo_deng/moisturized-deployment") - - async def test_create_then_delete_deployment(prefect_client): @flow def foo(): @@ -880,7 +859,7 @@ def foo(): async def test_read_nonexistent_deployment_by_name(prefect_client): - with pytest.raises(prefect.exceptions.ObjectNotFound): + with pytest.raises((prefect.exceptions.ObjectNotFound, ValueError)): await prefect_client.read_deployment_by_name("not-a-real-deployment")