Skip to content

Commit

Permalink
Set logical date correctly in task runner tests
Browse files Browse the repository at this point in the history
  • Loading branch information
uranusjr committed Jan 9, 2025
1 parent 809b78e commit 296f247
Showing 1 changed file with 23 additions and 7 deletions.
30 changes: 23 additions & 7 deletions tests/task/test_standard_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from airflow.utils.platform import getuser
from airflow.utils.state import State
from airflow.utils.timeout import timeout
from airflow.utils.types import DagRunType

from tests.listeners import xcom_listener
from tests.listeners.file_write_listener import FileWriteListener
Expand Down Expand Up @@ -173,7 +174,7 @@ def test_start_and_terminate(self, mock_init, mock_read_task_utilization):
mock_read_task_utilization.assert_called()

@pytest.mark.db_test
def test_notifies_about_start_and_stop(self, tmp_path):
def test_notifies_about_start_and_stop(self, tmp_path, session):
path_listener_writer = tmp_path / "test_notifies_about_start_and_stop"

lm = get_listener_manager()
Expand All @@ -185,14 +186,19 @@ def test_notifies_about_start_and_stop(self, tmp_path):
)
dag = dagbag.dags.get("test_example_bash_operator")
task = dag.get_task("runme_1")
current_time = timezone.utcnow()
triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dag.create_dagrun(
run_id="test",
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
logical_date=current_time,
data_interval=(current_time, current_time),
run_type=DagRunType.MANUAL,
state=State.RUNNING,
start_date=DEFAULT_DATE,
start_date=current_time,
session=session,
**triggered_by_kwargs,
)
session.commit()
ti = TaskInstance(task=task, run_id="test")
job = Job(dag_id=ti.dag_id)
job_runner = LocalTaskJobRunner(job=job, task_instance=ti, ignore_ti_state=True)
Expand Down Expand Up @@ -228,12 +234,15 @@ def test_notifies_about_fail(self, tmp_path):
)
dag = dagbag.dags.get("test_failing_bash_operator")
task = dag.get_task("failing_task")
current_time = timezone.utcnow()
triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dag.create_dagrun(
run_id="test",
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
logical_date=current_time,
data_interval=(current_time, current_time),
run_type=DagRunType.MANUAL,
state=State.RUNNING,
start_date=DEFAULT_DATE,
start_date=current_time,
**triggered_by_kwargs,
)
ti = TaskInstance(task=task, run_id="test")
Expand Down Expand Up @@ -275,12 +284,15 @@ def test_ol_does_not_block_xcoms(self, tmp_path):
)
dag = dagbag.dags.get("test_dag_xcom_openlineage")
task = dag.get_task("push_and_pull")
current_time = timezone.utcnow()
triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dag.create_dagrun(
run_id="test",
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
logical_date=current_time,
data_interval=(current_time, current_time),
run_type=DagRunType.MANUAL,
state=State.RUNNING,
start_date=DEFAULT_DATE,
start_date=current_time,
**triggered_by_kwargs,
)

Expand Down Expand Up @@ -408,7 +420,9 @@ def test_on_kill(self):
triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dag.create_dagrun(
run_id="test",
logical_date=DEFAULT_DATE,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
run_type=DagRunType.MANUAL,
state=State.RUNNING,
start_date=DEFAULT_DATE,
**triggered_by_kwargs,
Expand Down Expand Up @@ -463,7 +477,9 @@ def test_parsing_context(self):

dag.create_dagrun(
run_id="test",
logical_date=DEFAULT_DATE,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
run_type=DagRunType.MANUAL,
state=State.RUNNING,
start_date=DEFAULT_DATE,
**triggered_by_kwargs,
Expand Down

0 comments on commit 296f247

Please sign in to comment.