Skip to content

Commit

Permalink
Switch to dag_maker in external task sensor tests
Browse files Browse the repository at this point in the history
  • Loading branch information
uranusjr committed Jan 14, 2025
1 parent 28feea5 commit 66d0a33
Showing 1 changed file with 13 additions and 34 deletions.
47 changes: 13 additions & 34 deletions tests/sensors/test_external_task_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,17 +439,11 @@ def test_external_task_sensor_failed_states_as_success_mulitple_task_ids(self, c
f"in dag unit_test_dag on {DEFAULT_DATE.isoformat()} ... "
) in caplog.messages

def test_external_dag_sensor(self):
other_dag = DAG("other_dag", default_args=self.args, end_date=DEFAULT_DATE, schedule="@once")
triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
other_dag.create_dagrun(
run_id="test",
start_date=DEFAULT_DATE,
logical_date=DEFAULT_DATE,
state=State.SUCCESS,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
**triggered_by_kwargs,
)
def test_external_dag_sensor(self, dag_maker):
with dag_maker("other_dag", default_args=self.args, end_date=DEFAULT_DATE, schedule="@once"):
pass
dag_maker.create_dagrun(state=DagRunState.SUCCESS)

op = ExternalTaskSensor(
task_id="test_external_dag_sensor_check",
external_dag_id="other_dag",
Expand All @@ -458,17 +452,10 @@ def test_external_dag_sensor(self):
)
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

def test_external_dag_sensor_log(self, caplog):
other_dag = DAG("other_dag", default_args=self.args, end_date=DEFAULT_DATE, schedule="@once")
triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
other_dag.create_dagrun(
run_id="test",
start_date=DEFAULT_DATE,
logical_date=DEFAULT_DATE,
state=State.SUCCESS,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
**triggered_by_kwargs,
)
def test_external_dag_sensor_log(self, caplog, dag_maker):
with dag_maker("other_dag", default_args=self.args, end_date=DEFAULT_DATE, schedule="@once"):
pass
dag_maker.create_dagrun(state=DagRunState.SUCCESS)
op = ExternalTaskSensor(
task_id="test_external_dag_sensor_check",
external_dag_id="other_dag",
Expand All @@ -477,17 +464,10 @@ def test_external_dag_sensor_log(self, caplog):
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
assert (f"Poking for DAG 'other_dag' on {DEFAULT_DATE.isoformat()} ... ") in caplog.messages

def test_external_dag_sensor_soft_fail_as_skipped(self):
other_dag = DAG("other_dag", default_args=self.args, end_date=DEFAULT_DATE, schedule="@once")
triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
other_dag.create_dagrun(
run_id="test",
start_date=DEFAULT_DATE,
logical_date=DEFAULT_DATE,
state=State.SUCCESS,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
**triggered_by_kwargs,
)
def test_external_dag_sensor_soft_fail_as_skipped(self, dag_maker, session):
with dag_maker("other_dag", default_args=self.args, end_date=DEFAULT_DATE, schedule="@once"):
pass
dag_maker.create_dagrun(state=DagRunState.SUCCESS)
op = ExternalTaskSensor(
task_id="test_external_dag_sensor_check",
external_dag_id="other_dag",
Expand All @@ -502,7 +482,6 @@ def test_external_dag_sensor_soft_fail_as_skipped(self):
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

# then
session = settings.Session()
TI = TaskInstance
task_instances: list[TI] = session.query(TI).filter(TI.task_id == op.task_id).all()
assert len(task_instances) == 1, "Unexpected number of task instances"
Expand Down

0 comments on commit 66d0a33

Please sign in to comment.