Skip to content

Commit ae3539f

Browse files
committed
Pass run_after to DagRun in tests
1 parent c7e89df commit ae3539f

File tree

9 files changed

+58
-15
lines changed

9 files changed

+58
-15
lines changed

providers/tests/apache/kylin/operators/test_kylin_cube.py

+2
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,8 @@ def test_render_template(self, session):
176176
dag_id=self.dag.dag_id,
177177
run_id="kylin_test",
178178
logical_date=DEFAULT_DATE,
179+
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
180+
run_after=DEFAULT_DATE,
179181
run_type=DagRunType.MANUAL,
180182
)
181183
else:

providers/tests/apache/spark/operators/test_spark_submit.py

+2
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,8 @@ def test_render_template(self, session):
200200
dag_id=self.dag.dag_id,
201201
run_id="spark_test",
202202
logical_date=DEFAULT_DATE,
203+
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
204+
run_after=DEFAULT_DATE,
203205
run_type=DagRunType.MANUAL,
204206
)
205207
else:

providers/tests/cncf/kubernetes/operators/test_pod.py

+18-5
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
from airflow.utils.types import DagRunType
5252

5353
from tests_common.test_utils import db
54+
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
5455

5556
pytestmark = pytest.mark.db_test
5657

@@ -92,11 +93,23 @@ def create_context(task, persist_to_db=False, map_index=None):
9293
else:
9394
dag = DAG(dag_id="dag", schedule=None, start_date=pendulum.now())
9495
dag.add_task(task)
95-
dag_run = DagRun(
96-
run_id=DagRun.generate_run_id(DagRunType.MANUAL, DEFAULT_DATE),
97-
run_type=DagRunType.MANUAL,
98-
dag_id=dag.dag_id,
99-
)
96+
now = timezone.utcnow()
97+
if AIRFLOW_V_3_0_PLUS:
98+
dag_run = DagRun(
99+
run_id=DagRun.generate_run_id(DagRunType.MANUAL, DEFAULT_DATE),
100+
run_type=DagRunType.MANUAL,
101+
dag_id=dag.dag_id,
102+
logical_date=now,
103+
data_interval=(now, now),
104+
run_after=now,
105+
)
106+
else:
107+
dag_run = DagRun(
108+
run_id=DagRun.generate_run_id(DagRunType.MANUAL, DEFAULT_DATE),
109+
run_type=DagRunType.MANUAL,
110+
dag_id=dag.dag_id,
111+
execution_date=now,
112+
)
100113
task_instance = TaskInstance(task=task, run_id=dag_run.run_id)
101114
task_instance.dag_run = dag_run
102115
if map_index is not None:

providers/tests/fab/auth_manager/api_endpoints/test_dag_run_endpoint.py

+16-8
Original file line numberDiff line numberDiff line change
@@ -142,18 +142,26 @@ def teardown_method(self) -> None:
142142
def _create_test_dag_run(self, state=DagRunState.RUNNING, extra_dag=False, commit=True, idx_start=1):
143143
dag_runs = []
144144
dags = []
145-
triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
145+
146+
def _v3_kwargs(date):
147+
if AIRFLOW_V_3_0_PLUS:
148+
return {
149+
"data_interval": (date, date),
150+
"logical_date": date,
151+
"run_after": date,
152+
"triggered_by": DagRunTriggeredByType.TEST,
153+
}
154+
return {"execution_date": date}
146155

147156
for i in range(idx_start, idx_start + 2):
148157
dagrun_model = DagRun(
149158
dag_id="TEST_DAG_ID",
150159
run_id=f"TEST_DAG_RUN_ID_{i}",
151160
run_type=DagRunType.MANUAL,
152-
logical_date=timezone.parse(self.default_time) + timedelta(days=i - 1),
153161
start_date=timezone.parse(self.default_time),
154162
external_trigger=True,
155163
state=state,
156-
**triggered_by_kwargs,
164+
**_v3_kwargs(timezone.parse(self.default_time) + timedelta(days=i - 1)),
157165
)
158166
dagrun_model.updated_at = timezone.parse(self.default_time)
159167
dag_runs.append(dagrun_model)
@@ -166,10 +174,10 @@ def _create_test_dag_run(self, state=DagRunState.RUNNING, extra_dag=False, commi
166174
dag_id=f"TEST_DAG_ID_{i}",
167175
run_id=f"TEST_DAG_RUN_ID_{i}",
168176
run_type=DagRunType.MANUAL,
169-
logical_date=timezone.parse(self.default_time_2),
170177
start_date=timezone.parse(self.default_time),
171178
external_trigger=True,
172179
state=state,
180+
**_v3_kwargs(timezone.parse(self.default_time_2)),
173181
)
174182
)
175183
if commit:
@@ -203,8 +211,8 @@ def test_should_return_accessible_with_tilde_as_dag_id_and_dag_level_permissions
203211
"external_trigger": True,
204212
"start_date": self.default_time,
205213
"conf": {},
206-
"data_interval_end": None,
207-
"data_interval_start": None,
214+
"data_interval_end": self.default_time,
215+
"data_interval_start": self.default_time,
208216
"last_scheduling_decision": None,
209217
"run_type": "manual",
210218
"note": None,
@@ -219,8 +227,8 @@ def test_should_return_accessible_with_tilde_as_dag_id_and_dag_level_permissions
219227
"external_trigger": True,
220228
"start_date": self.default_time,
221229
"conf": {},
222-
"data_interval_end": None,
223-
"data_interval_start": None,
230+
"data_interval_end": self.default_time_2,
231+
"data_interval_start": self.default_time_2,
224232
"last_scheduling_decision": None,
225233
"run_type": "manual",
226234
"note": None,

providers/tests/fab/auth_manager/api_endpoints/test_task_instance_endpoint.py

+2
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,8 @@ def create_task_instances(
166166
run_id=run_id,
167167
dag_id=dag_id,
168168
logical_date=logical_date,
169+
data_interval=(logical_date, logical_date),
170+
run_after=logical_date,
169171
run_type=DagRunType.MANUAL,
170172
state=dag_run_state,
171173
)

providers/tests/fab/auth_manager/api_endpoints/test_xcom_endpoint.py

+4
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,8 @@ def _create_xcom_entries(self, dag_id, run_id, logical_date, task_id, mapped_ti=
180180
dag_id=dag_id,
181181
run_id=run_id,
182182
logical_date=logical_date,
183+
data_interval=(logical_date, logical_date),
184+
run_after=logical_date,
183185
start_date=logical_date,
184186
run_type=DagRunType.MANUAL,
185187
)
@@ -226,6 +228,7 @@ def _create_invalid_xcom_entries(self, logical_date):
226228
dag_id="invalid_dag",
227229
run_id="invalid_run_id",
228230
logical_date=logical_date + timedelta(days=1),
231+
run_after=logical_date,
229232
start_date=logical_date,
230233
run_type=DagRunType.MANUAL,
231234
)
@@ -243,6 +246,7 @@ def _create_invalid_xcom_entries(self, logical_date):
243246
dag_id="invalid_dag",
244247
run_id="not_this_run_id",
245248
logical_date=logical_date,
249+
run_after=logical_date,
246250
start_date=logical_date,
247251
run_type=DagRunType.MANUAL,
248252
)

providers/tests/redis/log/test_redis_task_handler.py

+8-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,14 @@ def ti(self):
4242
dag = DAG(dag_id="dag_for_testing_redis_task_handler", schedule=None, start_date=date)
4343
task = EmptyOperator(task_id="task_for_testing_redis_log_handler", dag=dag)
4444
if AIRFLOW_V_3_0_PLUS:
45-
dag_run = DagRun(dag_id=dag.dag_id, logical_date=date, run_id="test", run_type="scheduled")
45+
dag_run = DagRun(
46+
dag_id=dag.dag_id,
47+
logical_date=date,
48+
data_interval=(date, date),
49+
run_after=date,
50+
run_id="test",
51+
run_type="scheduled",
52+
)
4653
else:
4754
dag_run = DagRun(dag_id=dag.dag_id, execution_date=date, run_id="test", run_type="scheduled")
4855

tests/jobs/test_triggerer_job.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,13 @@ def session():
9393
def create_trigger_in_db(session, trigger, operator=None):
9494
dag_model = DagModel(dag_id="test_dag")
9595
dag = DAG(dag_id=dag_model.dag_id, schedule="@daily", start_date=pendulum.datetime(2023, 1, 1))
96+
date = pendulum.datetime(2023, 1, 1)
9697
run = DagRun(
9798
dag_id=dag_model.dag_id,
9899
run_id="test_run",
99-
logical_date=pendulum.datetime(2023, 1, 1),
100+
logical_date=date,
101+
data_interval=(date, date),
102+
run_after=date,
100103
run_type=DagRunType.MANUAL,
101104
)
102105
trigger_orm = Trigger.from_object(trigger)

tests/models/test_xcom.py

+2
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ def func(*, dag_id, task_id, logical_date):
6464
run_type=DagRunType.SCHEDULED,
6565
run_id=run_id,
6666
logical_date=logical_date,
67+
data_interval=(logical_date, logical_date),
68+
run_after=logical_date,
6769
)
6870
session.add(run)
6971
ti = TaskInstance(EmptyOperator(task_id=task_id), run_id=run_id)

0 commit comments

Comments
 (0)