diff --git a/airflow-core/src/airflow/ui/src/components/FlexibleForm/FieldDateTime.test.tsx b/airflow-core/src/airflow/ui/src/components/FlexibleForm/FieldDateTime.test.tsx new file mode 100644 index 0000000000000..d6d423abea3ac --- /dev/null +++ b/airflow-core/src/airflow/ui/src/components/FlexibleForm/FieldDateTime.test.tsx @@ -0,0 +1,144 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { fireEvent, render } from "@testing-library/react"; +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import { Wrapper } from "src/utils/Wrapper"; + +import { FieldDateTime } from "./FieldDateTime"; + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +const mockParamsDict: Record = {}; +const mockSetParamsDict = vi.fn(); + +vi.mock("src/queries/useParamStore", () => ({ + paramPlaceholder: { + schema: {}, + value: null, + }, + useParamStore: () => ({ + disabled: false, + paramsDict: mockParamsDict, + setParamsDict: mockSetParamsDict, + }), +})); + +describe("FieldDateTime", () => { + beforeEach(() => { + Object.keys(mockParamsDict).forEach((key) => { + // eslint-disable-next-line @typescript-eslint/no-dynamic-delete + delete mockParamsDict[key]; + }); + mockSetParamsDict.mockClear(); + }); + + describe("type=time (format:time)", () => { + it("renders with step=1 so the browser shows the seconds field", () => { + mockParamsDict.cutoff_time = { schema: { format: "time", type: "string" }, value: "" }; + + const { container } = render( + , + { wrapper: Wrapper }, + ); + + const input = container.querySelector("input") as HTMLInputElement; + + expect(input.step).toBe("1"); + }); + + it("pads HH:MM to HH:MM:SS when the user enters a time without seconds", () => { + mockParamsDict.cutoff_time = { schema: { format: "time", type: "string" }, value: "" }; + const onUpdate = vi.fn(); + + const { container } = render( + , + { wrapper: Wrapper }, + ); + + const input = container.querySelector("input") as HTMLInputElement; + + fireEvent.change(input, { target: { value: "19:30" } }); + + expect(onUpdate).toHaveBeenCalledWith("19:30:00"); + }); + + it("does not double-pad when value already contains seconds", () => { + mockParamsDict.cutoff_time = { schema: { format: "time", type: "string" }, value: "" }; + const onUpdate = vi.fn(); + + const { container } = render( + , + { wrapper: Wrapper }, + ); + + const input = container.querySelector("input") as HTMLInputElement; + + fireEvent.change(input, { target: { value: "19:30:45" } }); + + expect(onUpdate).toHaveBeenCalledWith("19:30:45"); + }); + + it("passes an empty value through unmodified", () => { + mockParamsDict.cutoff_time = { schema: { format: "time", type: "string" }, value: "10:00:00" }; + const onUpdate = vi.fn(); + + const { container } = render( + , + { wrapper: Wrapper }, + ); + + const input = container.querySelector("input") as HTMLInputElement; + + fireEvent.change(input, { target: { value: "" } }); + + expect(onUpdate).toHaveBeenCalledWith(""); + }); + }); + + describe("type=date (format:date)", () => { + it("renders without a step attribute", () => { + mockParamsDict.my_date = { schema: { format: "date", type: "string" }, value: "" }; + + const { container } = render( + , + { wrapper: Wrapper }, + ); + + const input = container.querySelector("input") as HTMLInputElement; + + expect(input.step).toBe(""); + }); + + it("passes the typed value through without modification", () => { + mockParamsDict.my_date = { schema: { format: "date", type: "string" }, value: "" }; + const onUpdate = vi.fn(); + + const { container } = render( + , + { wrapper: Wrapper }, + ); + + const input = container.querySelector("input") as HTMLInputElement; + + fireEvent.change(input, { target: { value: "2026-05-07" } }); + + expect(onUpdate).toHaveBeenCalledWith("2026-05-07"); + }); + }); +}); \ No newline at end of file diff --git a/airflow-core/src/airflow/ui/src/components/FlexibleForm/FieldDateTime.tsx b/airflow-core/src/airflow/ui/src/components/FlexibleForm/FieldDateTime.tsx index 9c6106d225c9a..41496c50c1b23 100644 --- a/airflow-core/src/airflow/ui/src/components/FlexibleForm/FieldDateTime.tsx +++ b/airflow-core/src/airflow/ui/src/components/FlexibleForm/FieldDateTime.tsx @@ -59,11 +59,15 @@ export const FieldDateTime = ({ disabled={disabled} id={`element_${name}`} name={`element_${name}`} - onChange={(event) => handleChange(event.target.value)} + onChange={(event) => { + const val = event.target.value; + handleChange(rest.type === "time" && val.length === 5 ? `${val}:00` : val); + }} required={rest.required} size="sm" + step={rest.type === "time" ? 1 : undefined} type={rest.type} - value={((param.value ?? "") as string).slice(0, 16)} + value={((param.value ?? "") as string).slice(0, rest.type === "time" ? 8 : 16)} /> ); -}; +}; \ No newline at end of file diff --git a/airflow-core/src/airflow/utils/db_cleanup.py b/airflow-core/src/airflow/utils/db_cleanup.py index e6b5283669b86..4bedb4ad053d3 100644 --- a/airflow-core/src/airflow/utils/db_cleanup.py +++ b/airflow-core/src/airflow/utils/db_cleanup.py @@ -282,12 +282,12 @@ def _do_delete( session.execute(delete) session.commit() - except BaseException as e: - raise e + except BaseException: + session.rollback() + raise finally: if target_table is not None and skip_archive: - bind = session.get_bind() - target_table.drop(bind=bind) + target_table.drop(bind=session.connection()) session.commit() print("Finished Performing Delete") diff --git a/airflow-core/tests/unit/utils/test_db_cleanup.py b/airflow-core/tests/unit/utils/test_db_cleanup.py index 2ddca75ec57c4..b0e72afcf5fcb 100644 --- a/airflow-core/tests/unit/utils/test_db_cleanup.py +++ b/airflow-core/tests/unit/utils/test_db_cleanup.py @@ -21,14 +21,16 @@ from importlib import import_module from io import StringIO from pathlib import Path -from unittest.mock import MagicMock, mock_open, patch +from types import SimpleNamespace +from unittest.mock import MagicMock, call, create_autospec, mock_open, patch from uuid import uuid4 import pendulum import pytest -from sqlalchemy import func, inspect, select, text +from sqlalchemy import Column, Integer, MetaData, Table, func, inspect, select, text from sqlalchemy.exc import OperationalError, SQLAlchemyError from sqlalchemy.ext.declarative import DeclarativeMeta +from sqlalchemy.orm import Session from airflow import DAG from airflow._shared.timezones import timezone @@ -45,6 +47,7 @@ _build_query, _cleanup_table, _confirm_drop_archives, + _do_delete, _dump_table_to_file, _get_archived_table_names, config_dict, @@ -487,6 +490,57 @@ def test_skip_archive_failure_will_remove_table(self, reflect_tables_mock): pass archived_table_names = _get_archived_table_names(["dag_run"], session) assert len(archived_table_names) == 0 + + @patch("airflow.utils.db_cleanup.reflect_tables") + def test_skip_archive_failure_rolls_back_before_drop_on_session_connection(self, reflect_tables_mock): + """ + Verify skip_archive cleanup releases the failed transaction before dropping the archive table. + + MySQL metadata locks from a failed DELETE are only released after rollback. Dropping the archive + table through the same session connection after rollback avoids self-deadlocking on a second + pooled connection. + """ + + class FakeModel: + name = "source_table" + + source_table = Table("source_table", MetaData(), Column("id", Integer, primary_key=True)) + archive_tables = {} + archive_drop = create_autospec(lambda *, bind: None) + session = MagicMock(spec=Session) + session.get_bind.return_value = SimpleNamespace(dialect=SimpleNamespace(name="mysql")) + session.scalars.return_value.one.return_value = 1 + session.connection.return_value = MagicMock(name="session_connection", spec=[]) + session.execute.side_effect = [None, None, SQLAlchemyError("delete failed")] + + def reflect_side_effect(table_names, session): + metadata = SimpleNamespace(tables={}) + for table_name in table_names: + if table_name == FakeModel.name: + metadata.tables[table_name] = source_table + else: + archive_table = archive_tables.setdefault( + table_name, Table(table_name, MetaData(), Column("id", Integer, primary_key=True)) + ) + archive_table.drop = archive_drop + metadata.tables[table_name] = archive_table + return metadata + + reflect_tables_mock.side_effect = reflect_side_effect + + with pytest.raises(SQLAlchemyError, match="delete failed"): + _do_delete( + query=select(source_table), + orm_model=FakeModel, + skip_archive=True, + session=session, + batch_size=None, + ) + + session.rollback.assert_called_once() + session.connection.assert_called_once() + archive_drop.assert_called_once_with(bind=session.connection.return_value) + assert session.mock_calls.index(call.rollback()) < session.mock_calls.index(call.connection()) def test_no_models_missing(self): """