From 296f24784a9b32e79b0ff6d5904fa527dfb0fe2e Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Thu, 9 Jan 2025 11:38:50 +0800 Subject: [PATCH] Set logical date correctly in task runner tests --- tests/task/test_standard_task_runner.py | 30 +++++++++++++++++++------ 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/tests/task/test_standard_task_runner.py b/tests/task/test_standard_task_runner.py index 5e9eee15b07dd..51c5f9f8fa3e8 100644 --- a/tests/task/test_standard_task_runner.py +++ b/tests/task/test_standard_task_runner.py @@ -40,6 +40,7 @@ from airflow.utils.platform import getuser from airflow.utils.state import State from airflow.utils.timeout import timeout +from airflow.utils.types import DagRunType from tests.listeners import xcom_listener from tests.listeners.file_write_listener import FileWriteListener @@ -173,7 +174,7 @@ def test_start_and_terminate(self, mock_init, mock_read_task_utilization): mock_read_task_utilization.assert_called() @pytest.mark.db_test - def test_notifies_about_start_and_stop(self, tmp_path): + def test_notifies_about_start_and_stop(self, tmp_path, session): path_listener_writer = tmp_path / "test_notifies_about_start_and_stop" lm = get_listener_manager() @@ -185,14 +186,19 @@ def test_notifies_about_start_and_stop(self, tmp_path): ) dag = dagbag.dags.get("test_example_bash_operator") task = dag.get_task("runme_1") + current_time = timezone.utcnow() triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dag.create_dagrun( run_id="test", - data_interval=(DEFAULT_DATE, DEFAULT_DATE), + logical_date=current_time, + data_interval=(current_time, current_time), + run_type=DagRunType.MANUAL, state=State.RUNNING, - start_date=DEFAULT_DATE, + start_date=current_time, + session=session, **triggered_by_kwargs, ) + session.commit() ti = TaskInstance(task=task, run_id="test") job = Job(dag_id=ti.dag_id) job_runner = LocalTaskJobRunner(job=job, task_instance=ti, ignore_ti_state=True) @@ -228,12 +234,15 @@ def test_notifies_about_fail(self, tmp_path): ) dag = dagbag.dags.get("test_failing_bash_operator") task = dag.get_task("failing_task") + current_time = timezone.utcnow() triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dag.create_dagrun( run_id="test", - data_interval=(DEFAULT_DATE, DEFAULT_DATE), + logical_date=current_time, + data_interval=(current_time, current_time), + run_type=DagRunType.MANUAL, state=State.RUNNING, - start_date=DEFAULT_DATE, + start_date=current_time, **triggered_by_kwargs, ) ti = TaskInstance(task=task, run_id="test") @@ -275,12 +284,15 @@ def test_ol_does_not_block_xcoms(self, tmp_path): ) dag = dagbag.dags.get("test_dag_xcom_openlineage") task = dag.get_task("push_and_pull") + current_time = timezone.utcnow() triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dag.create_dagrun( run_id="test", - data_interval=(DEFAULT_DATE, DEFAULT_DATE), + logical_date=current_time, + data_interval=(current_time, current_time), + run_type=DagRunType.MANUAL, state=State.RUNNING, - start_date=DEFAULT_DATE, + start_date=current_time, **triggered_by_kwargs, ) @@ -408,7 +420,9 @@ def test_on_kill(self): triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dag.create_dagrun( run_id="test", + logical_date=DEFAULT_DATE, data_interval=(DEFAULT_DATE, DEFAULT_DATE), + run_type=DagRunType.MANUAL, state=State.RUNNING, start_date=DEFAULT_DATE, **triggered_by_kwargs, @@ -463,7 +477,9 @@ def test_parsing_context(self): dag.create_dagrun( run_id="test", + logical_date=DEFAULT_DATE, data_interval=(DEFAULT_DATE, DEFAULT_DATE), + run_type=DagRunType.MANUAL, state=State.RUNNING, start_date=DEFAULT_DATE, **triggered_by_kwargs,