Skip to content

Commit c7e89df

Browse files
committed
Pass run_after to create_dagrun in tests
1 parent e47f521 commit c7e89df

30 files changed

+223
-307
lines changed

providers/tests/cncf/kubernetes/log_handlers/test_log_handlers.py

+1
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ def task_callable(ti):
126126
if AIRFLOW_V_3_0_PLUS:
127127
dagrun_kwargs: dict = {
128128
"logical_date": DEFAULT_DATE,
129+
"run_after": DEFAULT_DATE,
129130
"triggered_by": DagRunTriggeredByType.TEST,
130131
}
131132
else:

providers/tests/common/sql/operators/test_sql.py

+7
Original file line numberDiff line numberDiff line change
@@ -1163,6 +1163,7 @@ def test_branch_single_value_with_dag_run(self, mock_get_db_hook):
11631163
if AIRFLOW_V_3_0_PLUS:
11641164
dagrun_kwargs = {
11651165
"logical_date": DEFAULT_DATE,
1166+
"run_after": DEFAULT_DATE,
11661167
"triggered_by": DagRunTriggeredByType.TEST,
11671168
}
11681169
else:
@@ -1213,6 +1214,7 @@ def test_branch_true_with_dag_run(self, mock_get_db_hook):
12131214
if AIRFLOW_V_3_0_PLUS:
12141215
dagrun_kwargs = {
12151216
"logical_date": DEFAULT_DATE,
1217+
"run_after": DEFAULT_DATE,
12161218
"triggered_by": DagRunTriggeredByType.TEST,
12171219
}
12181220
else:
@@ -1263,6 +1265,7 @@ def test_branch_false_with_dag_run(self, mock_get_db_hook):
12631265
if AIRFLOW_V_3_0_PLUS:
12641266
dagrun_kwargs = {
12651267
"logical_date": DEFAULT_DATE,
1268+
"run_after": DEFAULT_DATE,
12661269
"triggered_by": DagRunTriggeredByType.TEST,
12671270
}
12681271
else:
@@ -1314,6 +1317,7 @@ def test_branch_list_with_dag_run(self, mock_get_db_hook):
13141317
if AIRFLOW_V_3_0_PLUS:
13151318
dagrun_kwargs = {
13161319
"logical_date": DEFAULT_DATE,
1320+
"run_after": DEFAULT_DATE,
13171321
"triggered_by": DagRunTriggeredByType.TEST,
13181322
}
13191323
else:
@@ -1362,6 +1366,7 @@ def test_invalid_query_result_with_dag_run(self, mock_get_db_hook):
13621366
if AIRFLOW_V_3_0_PLUS:
13631367
dagrun_kwargs = {
13641368
"logical_date": DEFAULT_DATE,
1369+
"run_after": DEFAULT_DATE,
13651370
"triggered_by": DagRunTriggeredByType.TEST,
13661371
}
13671372
else:
@@ -1401,6 +1406,7 @@ def test_with_skip_in_branch_downstream_dependencies(self, mock_get_db_hook):
14011406
if AIRFLOW_V_3_0_PLUS:
14021407
dagrun_kwargs = {
14031408
"logical_date": DEFAULT_DATE,
1409+
"run_after": DEFAULT_DATE,
14041410
"triggered_by": DagRunTriggeredByType.TEST,
14051411
}
14061412
else:
@@ -1449,6 +1455,7 @@ def test_with_skip_in_branch_downstream_dependencies2(self, mock_get_db_hook):
14491455
if AIRFLOW_V_3_0_PLUS:
14501456
dagrun_kwargs = {
14511457
"logical_date": DEFAULT_DATE,
1458+
"run_after": DEFAULT_DATE,
14521459
"triggered_by": DagRunTriggeredByType.TEST,
14531460
}
14541461
else:

providers/tests/openlineage/plugins/test_execution.py

+2
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ def setup_job(self, task_name, run_id):
101101
if AIRFLOW_V_3_0_PLUS:
102102
dagrun_kwargs = {
103103
"logical_date": DEFAULT_DATE,
104+
"run_after": DEFAULT_DATE,
104105
"triggered_by": DagRunTriggeredByType.TEST,
105106
}
106107
else:
@@ -207,6 +208,7 @@ def test_success_overtime_kills_tasks(self):
207208
if AIRFLOW_V_3_0_PLUS:
208209
dagrun_kwargs = {
209210
"logical_date": DEFAULT_DATE,
211+
"run_after": DEFAULT_DATE,
210212
"triggered_by": DagRunTriggeredByType.TEST,
211213
}
212214
else:

providers/tests/openlineage/plugins/test_listener.py

+3
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ def test_listener_does_not_change_task_instance(render_mock, xcom_push_mock):
9393
dagrun_kwargs = {
9494
"dag_version": None,
9595
"logical_date": date,
96+
"run_after": date,
9697
"triggered_by": types.DagRunTriggeredByType.TEST,
9798
}
9899
else:
@@ -179,6 +180,7 @@ def sample_callable(**kwargs):
179180
dagrun_kwargs: dict = {
180181
"dag_version": None,
181182
"logical_date": date,
183+
"run_after": date,
182184
"triggered_by": types.DagRunTriggeredByType.TEST,
183185
}
184186
else:
@@ -710,6 +712,7 @@ def simple_callable(**kwargs):
710712
dagrun_kwargs = {
711713
"dag_version": None,
712714
"logical_date": date,
715+
"run_after": date,
713716
"triggered_by": types.DagRunTriggeredByType.TEST,
714717
}
715718
else:

providers/tests/openlineage/plugins/test_utils.py

+1
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ def test_get_dagrun_start_end(dag_maker):
105105
if AIRFLOW_V_3_0_PLUS:
106106
dagrun_kwargs = {
107107
"logical_date": data_interval.start,
108+
"run_after": data_interval.end,
108109
"triggered_by": DagRunTriggeredByType.TEST,
109110
}
110111
else:

tests/api_connexion/endpoints/test_extra_link_endpoint.py

+5-8
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,13 @@
3030
from airflow.timetables.base import DataInterval
3131
from airflow.utils import timezone
3232
from airflow.utils.state import DagRunState
33-
from airflow.utils.types import DagRunType
33+
from airflow.utils.types import DagRunTriggeredByType, DagRunType
3434

3535
from tests_common.test_utils.api_connexion_utils import create_user, delete_user
3636
from tests_common.test_utils.compat import BaseOperatorLink
3737
from tests_common.test_utils.db import clear_db_runs, clear_db_xcom
3838
from tests_common.test_utils.mock_operators import CustomOperator
3939
from tests_common.test_utils.mock_plugins import mock_plugin_manager
40-
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
41-
42-
if AIRFLOW_V_3_0_PLUS:
43-
from airflow.utils.types import DagRunTriggeredByType
4440

4541
pytestmark = pytest.mark.db_test
4642

@@ -79,15 +75,16 @@ def setup_attrs(self, configured_app, session) -> None:
7975
self.app.dag_bag.dags = {self.dag.dag_id: self.dag}
8076
self.app.dag_bag.sync_to_db("dags-folder", None)
8177

82-
triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
78+
data_interval = DataInterval(timezone.datetime(2020, 1, 1), timezone.datetime(2020, 1, 2))
8379
self.dag.create_dagrun(
8480
run_id="TEST_DAG_RUN_ID",
8581
logical_date=self.default_time,
8682
run_type=DagRunType.MANUAL,
8783
state=DagRunState.SUCCESS,
8884
session=session,
89-
data_interval=DataInterval(timezone.datetime(2020, 1, 1), timezone.datetime(2020, 1, 2)),
90-
**triggered_by_kwargs,
85+
data_interval=data_interval,
86+
run_after=data_interval.end,
87+
triggered_by=DagRunTriggeredByType.TEST,
9188
)
9289
session.flush()
9390

tests/api_fastapi/core_api/routes/public/test_extra_links.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -74,15 +74,14 @@ def setup(self, test_client, session=None) -> None:
7474
test_client.app.state.dag_bag = dag_bag
7575
dag_bag.sync_to_db("dags-folder", None)
7676

77-
triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST}
78-
7977
self.dag.create_dagrun(
8078
run_id=self.dag_run_id,
8179
logical_date=self.default_time,
8280
run_type=DagRunType.MANUAL,
8381
state=DagRunState.SUCCESS,
8482
data_interval=(timezone.datetime(2020, 1, 1), timezone.datetime(2020, 1, 2)),
85-
**triggered_by_kwargs,
83+
run_after=timezone.datetime(2020, 1, 2),
84+
triggered_by=DagRunTriggeredByType.TEST,
8685
)
8786

8887
def teardown_method(self) -> None:

tests/cli/commands/remote_commands/test_task_command.py

+16-50
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,10 @@
5050
from airflow.utils import timezone
5151
from airflow.utils.session import create_session
5252
from airflow.utils.state import State, TaskInstanceState
53-
from airflow.utils.types import DagRunType
53+
from airflow.utils.types import DagRunTriggeredByType, DagRunType
5454

5555
from tests_common.test_utils.config import conf_vars
5656
from tests_common.test_utils.db import clear_db_pools, clear_db_runs, parse_and_sync_to_db
57-
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
58-
59-
if AIRFLOW_V_3_0_PLUS:
60-
from airflow.utils.types import DagRunTriggeredByType
6157

6258
pytestmark = pytest.mark.db_test
6359

@@ -104,21 +100,15 @@ def setup_class(cls):
104100
cls.dagbag = DagBag(read_dags_from_db=True)
105101
cls.dag = cls.dagbag.get_dag(cls.dag_id)
106102
data_interval = cls.dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE)
107-
v3_kwargs = (
108-
{
109-
"dag_version": None,
110-
"triggered_by": DagRunTriggeredByType.TEST,
111-
}
112-
if AIRFLOW_V_3_0_PLUS
113-
else {}
114-
)
115103
cls.dag_run = cls.dag.create_dagrun(
116104
state=State.NONE,
117105
run_id=cls.run_id,
118106
run_type=DagRunType.MANUAL,
119107
logical_date=DEFAULT_DATE,
120108
data_interval=data_interval,
121-
**v3_kwargs,
109+
run_after=DEFAULT_DATE,
110+
dag_version=None,
111+
triggered_by=DagRunTriggeredByType.TEST,
122112
)
123113

124114
@classmethod
@@ -175,22 +165,16 @@ def test_cli_test_different_path(self, session, tmp_path):
175165

176166
logical_date = pendulum.now("UTC")
177167
data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date)
178-
v3_kwargs = (
179-
{
180-
"dag_version": None,
181-
"triggered_by": DagRunTriggeredByType.TEST,
182-
}
183-
if AIRFLOW_V_3_0_PLUS
184-
else {}
185-
)
186168
dag.create_dagrun(
187169
state=State.NONE,
188170
run_id="abc123",
189171
run_type=DagRunType.MANUAL,
190172
logical_date=logical_date,
191173
data_interval=data_interval,
174+
run_after=logical_date,
175+
dag_version=None,
176+
triggered_by=DagRunTriggeredByType.TEST,
192177
session=session,
193-
**v3_kwargs,
194178
)
195179
session.commit()
196180

@@ -647,22 +631,16 @@ def test_task_states_for_dag_run(self):
647631
default_date2 = timezone.datetime(2016, 1, 9)
648632
dag2.clear()
649633
data_interval = dag2.timetable.infer_manual_data_interval(run_after=default_date2)
650-
v3_kwargs = (
651-
{
652-
"dag_version": None,
653-
"triggered_by": DagRunTriggeredByType.CLI,
654-
}
655-
if AIRFLOW_V_3_0_PLUS
656-
else {}
657-
)
658634
dagrun = dag2.create_dagrun(
659635
run_id="test",
660636
state=State.RUNNING,
661637
logical_date=default_date2,
662638
data_interval=data_interval,
639+
run_after=default_date2,
663640
run_type=DagRunType.MANUAL,
664641
external_trigger=True,
665-
**v3_kwargs,
642+
dag_version=None,
643+
triggered_by=DagRunTriggeredByType.CLI,
666644
)
667645
ti2 = TaskInstance(task2, run_id=dagrun.run_id)
668646
ti2.set_state(State.SUCCESS)
@@ -736,22 +714,16 @@ def setup_method(self) -> None:
736714

737715
dag = DagBag().get_dag(self.dag_id)
738716
data_interval = dag.timetable.infer_manual_data_interval(run_after=self.logical_date)
739-
v3_kwargs = (
740-
{
741-
"dag_version": None,
742-
"triggered_by": DagRunTriggeredByType.TEST,
743-
}
744-
if AIRFLOW_V_3_0_PLUS
745-
else {}
746-
)
747717
self.dr = dag.create_dagrun(
748718
run_id=self.run_id,
749719
logical_date=self.logical_date,
750720
data_interval=data_interval,
721+
run_after=self.logical_date,
751722
start_date=timezone.utcnow(),
752723
state=State.RUNNING,
753724
run_type=DagRunType.MANUAL,
754-
**v3_kwargs,
725+
dag_version=None,
726+
triggered_by=DagRunTriggeredByType.TEST,
755727
)
756728
self.tis = self.dr.get_task_instances()
757729
assert len(self.tis) == 1
@@ -1049,22 +1021,16 @@ def test_context_with_run():
10491021
dag = DagBag().get_dag(dag_id)
10501022
data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date)
10511023

1052-
v3_kwargs = (
1053-
{
1054-
"dag_version": None,
1055-
"triggered_by": DagRunTriggeredByType.TEST,
1056-
}
1057-
if AIRFLOW_V_3_0_PLUS
1058-
else {}
1059-
)
10601024
dag.create_dagrun(
10611025
run_id=run_id,
10621026
logical_date=logical_date,
10631027
data_interval=data_interval,
1028+
run_after=logical_date,
10641029
start_date=timezone.utcnow(),
10651030
state=State.RUNNING,
10661031
run_type=DagRunType.MANUAL,
1067-
**v3_kwargs,
1032+
dag_version=None,
1033+
triggered_by=DagRunTriggeredByType.TEST,
10681034
)
10691035
with conf_vars({("core", "dags_folder"): dag_path}):
10701036
task_command.task_run(parser.parse_args(task_args))

tests/dag_processing/test_processor.py

+1
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ def test_execute_on_failure_callbacks_without_dag(self, mock_ti_handle_failure,
9999
logical_date=DEFAULT_DATE,
100100
run_type=DagRunType.SCHEDULED,
101101
data_interval=dag.infer_automated_data_interval(DEFAULT_DATE),
102+
run_after=DEFAULT_DATE,
102103
triggered_by=DagRunTriggeredByType.TEST,
103104
session=session,
104105
)

tests/decorators/test_python.py

+5-9
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,10 @@
3838
from airflow.utils.task_group import TaskGroup
3939
from airflow.utils.task_instance_session import set_current_task_instance_session
4040
from airflow.utils.trigger_rule import TriggerRule
41-
from airflow.utils.types import DagRunType
41+
from airflow.utils.types import DagRunTriggeredByType, DagRunType
4242
from airflow.utils.xcom import XCOM_RETURN_KEY
4343

4444
from providers.tests.standard.operators.test_python import BasePythonTest
45-
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
46-
47-
if AIRFLOW_V_3_0_PLUS:
48-
from airflow.utils.types import DagRunTriggeredByType
4945

5046
pytestmark = pytest.mark.db_test
5147

@@ -442,7 +438,6 @@ def return_dict(number: int):
442438
with self.dag_non_serialized:
443439
ret = return_dict(test_number)
444440

445-
triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
446441
dr = self.dag_non_serialized.create_dagrun(
447442
run_id="test",
448443
run_type=DagRunType.MANUAL,
@@ -452,7 +447,8 @@ def return_dict(number: int):
452447
data_interval=self.dag_non_serialized.timetable.infer_manual_data_interval(
453448
run_after=DEFAULT_DATE
454449
),
455-
**triggered_by_kwargs,
450+
run_after=DEFAULT_DATE,
451+
triggered_by=DagRunTriggeredByType.TEST,
456452
)
457453

458454
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
@@ -507,7 +503,6 @@ def add_num(number: int, num2: int = 2):
507503
bigger_number = add_2(test_number)
508504
ret = add_num(bigger_number, XComArg(bigger_number.operator))
509505

510-
triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
511506
dr = self.dag_non_serialized.create_dagrun(
512507
run_id="test",
513508
run_type=DagRunType.MANUAL,
@@ -517,7 +512,8 @@ def add_num(number: int, num2: int = 2):
517512
data_interval=self.dag_non_serialized.timetable.infer_manual_data_interval(
518513
run_after=DEFAULT_DATE
519514
),
520-
**triggered_by_kwargs,
515+
run_after=DEFAULT_DATE,
516+
triggered_by=DagRunTriggeredByType.TEST,
521517
)
522518

523519
bigger_number.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)

0 commit comments

Comments
 (0)