Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions airflow-core/src/airflow/ui/src/queries/gridViewQueryKeys.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ import {
UseDagServiceGetDagDetailsKeyFn,
UseDagServiceGetLatestRunInfoKeyFn,
UseGridServiceGetGridRunsKeyFn,
useTaskInstanceServiceGetExtraLinksKey,
useTaskInstanceServiceGetLogKey,
UseTaskInstanceServiceGetTaskInstancesKeyFn,
useTaskInstanceServiceGetTaskInstanceTryDetailsKey,
} from "openapi/queries";

export const gridQueryKeys = (dagId: string) =>
Expand All @@ -32,3 +35,10 @@ export const gridQueryKeys = (dagId: string) =>
UseDagRunServiceGetDagRunsKeyFn({ dagId }, [{ dagId }]),
UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId: "~" }, [{ dagId, dagRunId: "~" }]),
] as const;

/** Prefix keys for per-attempt TI caches that become stale after any mutation. */
export const tiPerAttemptQueryKeys = [
[useTaskInstanceServiceGetLogKey],
[useTaskInstanceServiceGetExtraLinksKey],
[useTaskInstanceServiceGetTaskInstanceTryDetailsKey],
] as const;
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@ import { useQueryClient } from "@tanstack/react-query";
import { useState } from "react";
import { useTranslation } from "react-i18next";

import { useDagRunServiceGetDagRunsKey, useTaskInstanceServiceGetTaskInstancesKey } from "openapi/queries";
import {
useDagRunServiceGetDagRunsKey,
useTaskInstanceServiceGetMappedTaskInstanceKey,
useTaskInstanceServiceGetTaskInstancesKey,
} from "openapi/queries";
import { TaskInstanceService } from "openapi/requests/services.gen";
import type { TaskInstanceResponse } from "openapi/requests/types.gen";
import { toaster } from "src/components/ui";

import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys";

type Props = {
readonly clearSelections: VoidFunction;
readonly onSuccessConfirm: VoidFunction;
Expand All @@ -46,10 +52,15 @@ export const useBulkClearTaskInstances = ({ clearSelections, onSuccessConfirm }:
const [isPending, setIsPending] = useState(false);
const { t: translate } = useTranslation(["common", "dags"]);

const invalidateQueries = async () => {
const invalidateQueries = async (dagIds: ReadonlySet<string>) => {
await Promise.all([
queryClient.invalidateQueries({ queryKey: [useTaskInstanceServiceGetTaskInstancesKey] }),
queryClient.invalidateQueries({ queryKey: [useDagRunServiceGetDagRunsKey] }),
queryClient.invalidateQueries({ queryKey: [useTaskInstanceServiceGetMappedTaskInstanceKey] }),
...tiPerAttemptQueryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key })),
...[...dagIds].flatMap((dagId) =>
gridQueryKeys(dagId).map((key) => queryClient.invalidateQueries({ queryKey: key })),
),
]);
};

Expand Down Expand Up @@ -92,7 +103,7 @@ export const useBulkClearTaskInstances = ({ clearSelections, onSuccessConfirm }:
),
);

await invalidateQueries();
await invalidateQueries(new Set([...byDagRun.values()].map(({ dagId }) => dagId)));

toaster.create({
description: translate("toaster.bulkClear.success.description", {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import type {
} from "openapi/requests/types.gen";
import { toaster } from "src/components/ui";

import { tiPerAttemptQueryKeys } from "./gridViewQueryKeys";

type Props = {
readonly clearSelections: VoidFunction;
readonly onSuccessConfirm: VoidFunction;
Expand Down Expand Up @@ -62,6 +64,7 @@ export const useBulkTaskInstances = ({ clearSelections, onSuccessConfirm }: Prop
await Promise.all([
queryClient.invalidateQueries({ queryKey: [useTaskInstanceServiceGetTaskInstancesKey] }),
queryClient.invalidateQueries({ queryKey: [useDagRunServiceGetDagRunsKey] }),
...tiPerAttemptQueryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key })),
]);

const isDelete = Boolean(responseData.delete);
Expand Down
3 changes: 2 additions & 1 deletion airflow-core/src/airflow/ui/src/queries/useClearRun.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import {
} from "openapi/queries";
import { createErrorToaster } from "src/utils";

import { gridQueryKeys } from "./gridViewQueryKeys";
import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys";
import { useClearDagRunDryRunKey } from "./useClearDagRunDryRun";

export const useClearDagRun = ({
Expand Down Expand Up @@ -61,6 +61,7 @@ export const useClearDagRun = ({
[useDagRunServiceGetDagRunsKey],
[useClearDagRunDryRunKey, dagId],
UseGanttServiceGetGanttDataKeyFn({ dagId, runId: dagRunId }),
...tiPerAttemptQueryKeys,
];

await Promise.all([
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import type { ApiError } from "openapi/requests";
import type { ClearTaskInstancesBody, TaskInstanceCollectionResponse } from "openapi/requests/types.gen";
import { toaster } from "src/components/ui";

import { gridQueryKeys } from "./gridViewQueryKeys";
import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys";
import { useClearTaskInstancesDryRunKey } from "./useClearTaskInstancesDryRun";
import { usePatchTaskInstanceDryRunKey } from "./usePatchTaskInstanceDryRun";

Expand Down Expand Up @@ -119,6 +119,7 @@ export const useClearTaskInstances = ({
[useClearTaskInstancesDryRunKey, dagId],
[usePatchTaskInstanceDryRunKey, dagId, dagRunId],
UseGanttServiceGetGanttDataKeyFn({ dagId, runId: dagRunId }),
...tiPerAttemptQueryKeys,
];

await Promise.all([
Expand Down
9 changes: 7 additions & 2 deletions airflow-core/src/airflow/ui/src/queries/useDeleteDagRun.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import {
useDagRunServiceDeleteDagRun,
useDagRunServiceGetDagRunsKey,
UseDagRunServiceGetDagRunKeyFn,
useTaskInstanceServiceGetTaskInstancesKey,
UseGanttServiceGetGanttDataKeyFn,
useTaskInstanceServiceGetHitlDetailsKey,
useTaskInstanceServiceGetMappedTaskInstanceKey,
useTaskInstanceServiceGetTaskInstancesKey,
} from "openapi/queries";
import { toaster } from "src/components/ui";
import { gridQueryKeys } from "src/queries/gridViewQueryKeys";
import { gridQueryKeys, tiPerAttemptQueryKeys } from "src/queries/gridViewQueryKeys";
import { createErrorToaster } from "src/utils";

type DeleteDagRunParams = {
Expand Down Expand Up @@ -57,6 +59,9 @@ export const useDeleteDagRun = ({ dagId, dagRunId, onSuccessConfirm }: DeleteDag
[useDagRunServiceGetDagRunsKey],
[useTaskInstanceServiceGetTaskInstancesKey],
[useTaskInstanceServiceGetHitlDetailsKey],
UseGanttServiceGetGanttDataKeyFn({ dagId, runId: dagRunId }),
[useTaskInstanceServiceGetMappedTaskInstanceKey],
...tiPerAttemptQueryKeys,
];

await Promise.all([
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@ import {
useTaskInstanceServiceGetTaskInstancesKey,
useDagRunServiceGetDagRunsKey,
UseDagRunServiceGetDagRunKeyFn,
UseGanttServiceGetGanttDataKeyFn,
useTaskInstanceServiceGetHitlDetailsKey,
useTaskInstanceServiceGetMappedTaskInstanceKey,
} from "openapi/queries";
import { toaster } from "src/components/ui";
import { createErrorToaster } from "src/utils";

import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys";

type DeleteTaskInstanceParams = {
dagId: string;
dagRunId: string;
Expand Down Expand Up @@ -66,9 +70,15 @@ export const useDeleteTaskInstance = ({
[useTaskInstanceServiceGetTaskInstancesKey],
[useTaskInstanceServiceGetTaskInstanceKey, { dagId, dagRunId, mapIndex, taskId }],
[useTaskInstanceServiceGetHitlDetailsKey],
UseGanttServiceGetGanttDataKeyFn({ dagId, runId: dagRunId }),
[useTaskInstanceServiceGetMappedTaskInstanceKey],
...tiPerAttemptQueryKeys,
];

await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key })));
await Promise.all([
...queryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key })),
...gridQueryKeys(dagId).map((key) => queryClient.invalidateQueries({ queryKey: key })),
]);

toaster.create({
description: translate("dags:runAndTaskActions.delete.success.description", {
Expand Down
42 changes: 42 additions & 0 deletions airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
import { useQueryClient } from "@tanstack/react-query";
import { useEffect, useState } from "react";

import {
useDagRunServiceGetDagRunsKey,
useGridServiceGetGridRunsKey,
useTaskInstanceServiceGetTaskInstancesKey,
} from "openapi/queries";
import type { GridTISummaries, TaskInstanceState } from "openapi/requests";
import { OpenAPI } from "openapi/requests/core/OpenAPI";
import { isStatePending, useAutoRefresh } from "src/utils";

const GRID_MUTATION_WATCHED_KEYS = new Set([
useTaskInstanceServiceGetTaskInstancesKey,
useGridServiceGetGridRunsKey,
useDagRunServiceGetDagRunsKey,
]);

/**
* Streams TI summaries for all grid runs over a single HTTP connection (NDJSON).
*
Expand All @@ -41,6 +53,7 @@ export const useGridTiSummariesStream = ({
runIds: Array<string>;
states?: Array<TaskInstanceState | null | undefined>;
}) => {
const queryClient = useQueryClient();
const [summariesByRunId, setSummariesByRunId] = useState<Map<string, GridTISummaries>>(new Map());
const [refreshTick, setRefreshTick] = useState(0);

Expand Down Expand Up @@ -124,5 +137,34 @@ export const useGridTiSummariesStream = ({
return () => clearInterval(timer);
}, [hasActiveRuns, baseRefetchInterval]);

// Re-stream whenever a mutation invalidates a grid-related query (TI states,
// run states, or grid structure). Invalidation events only fire from explicit
// invalidateQueries() calls — never from polling intervals — so this never
// double-fires with the interval-based refresh above.
useEffect(() => {
let timeoutId: ReturnType<typeof setTimeout> | undefined;

const unsubscribe = queryClient.getQueryCache().subscribe((event) => {
const [firstKey] = event.query.queryKey as Array<unknown>;

if (
event.type === "updated" &&
event.action.type === "invalidate" &&
typeof firstKey === "string" &&
GRID_MUTATION_WATCHED_KEYS.has(firstKey)
) {
// Debounce: a single mutation invalidates several matching queries in one tick.
clearTimeout(timeoutId);
// eslint-disable-next-line max-nested-callbacks
timeoutId = setTimeout(() => setRefreshTick((tick) => tick + 1), 0);
}
});

return () => {
unsubscribe();
clearTimeout(timeoutId);
};
}, [queryClient]);

return { summariesByRunId };
};
9 changes: 8 additions & 1 deletion airflow-core/src/airflow/ui/src/queries/usePatchDagRun.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ import {
UseDagRunServiceGetDagRunKeyFn,
useDagRunServiceGetDagRunsKey,
useDagRunServicePatchDagRun,
UseGanttServiceGetGanttDataKeyFn,
useTaskInstanceServiceGetMappedTaskInstanceKey,
useTaskInstanceServiceGetTaskInstanceKey,
useTaskInstanceServiceGetTaskInstancesKey,
} from "openapi/queries";
import { createErrorToaster } from "src/utils";

import { gridQueryKeys } from "./gridViewQueryKeys";
import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys";
import { useClearDagRunDryRunKey } from "./useClearDagRunDryRun";

export const usePatchDagRun = ({
Expand Down Expand Up @@ -58,7 +61,11 @@ export const usePatchDagRun = ({
UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }),
[useDagRunServiceGetDagRunsKey],
[useTaskInstanceServiceGetTaskInstancesKey, { dagId, dagRunId }],
[useTaskInstanceServiceGetTaskInstanceKey, { dagId, dagRunId }],
[useTaskInstanceServiceGetMappedTaskInstanceKey, { dagId, dagRunId }],
[useClearDagRunDryRunKey, dagId],
UseGanttServiceGetGanttDataKeyFn({ dagId, runId: dagRunId }),
...tiPerAttemptQueryKeys,
];

await Promise.all([
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import {
} from "openapi/queries";
import { createErrorToaster } from "src/utils";

import { gridQueryKeys } from "./gridViewQueryKeys";
import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys";
import { useClearTaskInstancesDryRunKey } from "./useClearTaskInstancesDryRun";
import { usePatchTaskGroupDryRunKey } from "./usePatchTaskGroupDryRun";

Expand Down Expand Up @@ -59,6 +59,7 @@ export const usePatchTaskGroup = ({
[useTaskInstanceServiceGetTaskInstancesKey],
[usePatchTaskGroupDryRunKey, dagId, dagRunId, groupId],
[useClearTaskInstancesDryRunKey, dagId],
...tiPerAttemptQueryKeys,
];

await Promise.all([
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import {
} from "openapi/queries";
import { createErrorToaster } from "src/utils";

import { gridQueryKeys } from "./gridViewQueryKeys";
import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys";
import { useClearTaskInstancesDryRunKey } from "./useClearTaskInstancesDryRun";
import { usePatchTaskInstanceDryRunKey } from "./usePatchTaskInstanceDryRun";

Expand Down Expand Up @@ -65,6 +65,7 @@ export const usePatchTaskInstance = ({
[useTaskInstanceServiceGetTaskInstancesKey],
[usePatchTaskInstanceDryRunKey, dagId, dagRunId, { mapIndex, taskId }],
[useClearTaskInstancesDryRunKey, dagId],
...tiPerAttemptQueryKeys,
];

if (mapIndex !== undefined) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { useTranslation } from "react-i18next";
import {
UseDagRunServiceGetDagRunKeyFn,
useDagRunServiceGetDagRunsKey,
UseGanttServiceGetGanttDataKeyFn,
useTaskInstanceServiceGetHitlDetailsKey,
useTaskInstanceServiceGetHitlDetailKey,
useTaskInstanceServiceUpdateHitlDetail,
Expand All @@ -33,6 +34,8 @@ import { toaster } from "src/components/ui/Toaster";
import { createErrorToaster } from "src/utils";
import type { HITLResponseParams } from "src/utils/hitl";

import { gridQueryKeys, tiPerAttemptQueryKeys } from "./gridViewQueryKeys";

export const useUpdateHITLDetail = ({
dagId,
dagRunId,
Expand All @@ -55,9 +58,14 @@ export const useUpdateHITLDetail = ({
[useTaskInstanceServiceGetTaskInstanceKey, { dagId, dagRunId, mapIndex, taskId }],
[useTaskInstanceServiceGetHitlDetailsKey, { dagIdPrefixPattern: dagId, dagRunId }],
[useTaskInstanceServiceGetHitlDetailKey, { dagId, dagRunId }],
UseGanttServiceGetGanttDataKeyFn({ dagId, runId: dagRunId }),
...tiPerAttemptQueryKeys,
];

await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key })));
await Promise.all([
...queryKeys.map((key) => queryClient.invalidateQueries({ queryKey: key })),
...gridQueryKeys(dagId).map((key) => queryClient.invalidateQueries({ queryKey: key })),
]);

toaster.create({
title: translate("response.success", { taskId }),
Expand Down
Loading