Skip to content
Closed
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*!
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { fireEvent, render } from "@testing-library/react";
import { beforeEach, describe, expect, it, vi } from "vitest";

import { Wrapper } from "src/utils/Wrapper";

import { FieldDateTime } from "./FieldDateTime";

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const mockParamsDict: Record<string, any> = {};
const mockSetParamsDict = vi.fn();

vi.mock("src/queries/useParamStore", () => ({
paramPlaceholder: {
schema: {},
value: null,
},
useParamStore: () => ({
disabled: false,
paramsDict: mockParamsDict,
setParamsDict: mockSetParamsDict,
}),
}));

describe("FieldDateTime", () => {
beforeEach(() => {
Object.keys(mockParamsDict).forEach((key) => {
// eslint-disable-next-line @typescript-eslint/no-dynamic-delete
delete mockParamsDict[key];
});
mockSetParamsDict.mockClear();
});

describe("type=time (format:time)", () => {
it("renders with step=1 so the browser shows the seconds field", () => {
mockParamsDict.cutoff_time = { schema: { format: "time", type: "string" }, value: "" };

const { container } = render(
<FieldDateTime name="cutoff_time" onUpdate={vi.fn()} type="time" />,
{ wrapper: Wrapper },
);

const input = container.querySelector("input") as HTMLInputElement;

expect(input.step).toBe("1");
});

it("pads HH:MM to HH:MM:SS when the user enters a time without seconds", () => {
mockParamsDict.cutoff_time = { schema: { format: "time", type: "string" }, value: "" };
const onUpdate = vi.fn();

const { container } = render(
<FieldDateTime name="cutoff_time" onUpdate={onUpdate} type="time" />,
{ wrapper: Wrapper },
);

const input = container.querySelector("input") as HTMLInputElement;

fireEvent.change(input, { target: { value: "19:30" } });

expect(onUpdate).toHaveBeenCalledWith("19:30:00");
});

it("does not double-pad when value already contains seconds", () => {
mockParamsDict.cutoff_time = { schema: { format: "time", type: "string" }, value: "" };
const onUpdate = vi.fn();

const { container } = render(
<FieldDateTime name="cutoff_time" onUpdate={onUpdate} type="time" />,
{ wrapper: Wrapper },
);

const input = container.querySelector("input") as HTMLInputElement;

fireEvent.change(input, { target: { value: "19:30:45" } });

expect(onUpdate).toHaveBeenCalledWith("19:30:45");
});

it("passes an empty value through unmodified", () => {
mockParamsDict.cutoff_time = { schema: { format: "time", type: "string" }, value: "10:00:00" };
const onUpdate = vi.fn();

const { container } = render(
<FieldDateTime name="cutoff_time" onUpdate={onUpdate} type="time" />,
{ wrapper: Wrapper },
);

const input = container.querySelector("input") as HTMLInputElement;

fireEvent.change(input, { target: { value: "" } });

expect(onUpdate).toHaveBeenCalledWith("");
});
});

describe("type=date (format:date)", () => {
it("renders without a step attribute", () => {
mockParamsDict.my_date = { schema: { format: "date", type: "string" }, value: "" };

const { container } = render(
<FieldDateTime name="my_date" onUpdate={vi.fn()} type="date" />,
{ wrapper: Wrapper },
);

const input = container.querySelector("input") as HTMLInputElement;

expect(input.step).toBe("");
});

it("passes the typed value through without modification", () => {
mockParamsDict.my_date = { schema: { format: "date", type: "string" }, value: "" };
const onUpdate = vi.fn();

const { container } = render(
<FieldDateTime name="my_date" onUpdate={onUpdate} type="date" />,
{ wrapper: Wrapper },
);

const input = container.querySelector("input") as HTMLInputElement;

fireEvent.change(input, { target: { value: "2026-05-07" } });

expect(onUpdate).toHaveBeenCalledWith("2026-05-07");
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,15 @@ export const FieldDateTime = ({
disabled={disabled}
id={`element_${name}`}
name={`element_${name}`}
onChange={(event) => handleChange(event.target.value)}
onChange={(event) => {
const val = event.target.value;
handleChange(rest.type === "time" && val.length === 5 ? `${val}:00` : val);
}}
required={rest.required}
size="sm"
step={rest.type === "time" ? 1 : undefined}
type={rest.type}
value={((param.value ?? "") as string).slice(0, 16)}
value={((param.value ?? "") as string).slice(0, rest.type === "time" ? 8 : 16)}
/>
);
};
};
8 changes: 4 additions & 4 deletions airflow-core/src/airflow/utils/db_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,12 +282,12 @@ def _do_delete(
session.execute(delete)
session.commit()

except BaseException as e:
raise e
except BaseException:
session.rollback()
raise
finally:
if target_table is not None and skip_archive:
bind = session.get_bind()
target_table.drop(bind=bind)
target_table.drop(bind=session.connection())
session.commit()

print("Finished Performing Delete")
Expand Down
58 changes: 56 additions & 2 deletions airflow-core/tests/unit/utils/test_db_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@
from importlib import import_module
from io import StringIO
from pathlib import Path
from unittest.mock import MagicMock, mock_open, patch
from types import SimpleNamespace
from unittest.mock import MagicMock, call, create_autospec, mock_open, patch
from uuid import uuid4

import pendulum
import pytest
from sqlalchemy import func, inspect, select, text
from sqlalchemy import Column, Integer, MetaData, Table, func, inspect, select, text
from sqlalchemy.exc import OperationalError, SQLAlchemyError
from sqlalchemy.ext.declarative import DeclarativeMeta
from sqlalchemy.orm import Session

from airflow import DAG
from airflow._shared.timezones import timezone
Expand All @@ -45,6 +47,7 @@
_build_query,
_cleanup_table,
_confirm_drop_archives,
_do_delete,
_dump_table_to_file,
_get_archived_table_names,
config_dict,
Expand Down Expand Up @@ -487,6 +490,57 @@ def test_skip_archive_failure_will_remove_table(self, reflect_tables_mock):
pass
archived_table_names = _get_archived_table_names(["dag_run"], session)
assert len(archived_table_names) == 0

@patch("airflow.utils.db_cleanup.reflect_tables")
def test_skip_archive_failure_rolls_back_before_drop_on_session_connection(self, reflect_tables_mock):
"""
Verify skip_archive cleanup releases the failed transaction before dropping the archive table.

MySQL metadata locks from a failed DELETE are only released after rollback. Dropping the archive
table through the same session connection after rollback avoids self-deadlocking on a second
pooled connection.
"""

class FakeModel:
name = "source_table"

source_table = Table("source_table", MetaData(), Column("id", Integer, primary_key=True))
archive_tables = {}
archive_drop = create_autospec(lambda *, bind: None)
session = MagicMock(spec=Session)
session.get_bind.return_value = SimpleNamespace(dialect=SimpleNamespace(name="mysql"))
session.scalars.return_value.one.return_value = 1
session.connection.return_value = MagicMock(name="session_connection", spec=[])
session.execute.side_effect = [None, None, SQLAlchemyError("delete failed")]

def reflect_side_effect(table_names, session):
metadata = SimpleNamespace(tables={})
for table_name in table_names:
if table_name == FakeModel.name:
metadata.tables[table_name] = source_table
else:
archive_table = archive_tables.setdefault(
table_name, Table(table_name, MetaData(), Column("id", Integer, primary_key=True))
)
archive_table.drop = archive_drop
metadata.tables[table_name] = archive_table
return metadata

reflect_tables_mock.side_effect = reflect_side_effect

with pytest.raises(SQLAlchemyError, match="delete failed"):
_do_delete(
query=select(source_table),
orm_model=FakeModel,
skip_archive=True,
session=session,
batch_size=None,
)

session.rollback.assert_called_once()
session.connection.assert_called_once()
archive_drop.assert_called_once_with(bind=session.connection.return_value)
assert session.mock_calls.index(call.rollback()) < session.mock_calls.index(call.connection())

def test_no_models_missing(self):
"""
Expand Down