diff --git a/airflow/api/common/trigger_dag.py b/airflow/api/common/trigger_dag.py index fcc335b8b584c..d04a790c02f88 100644 --- a/airflow/api/common/trigger_dag.py +++ b/airflow/api/common/trigger_dag.py @@ -101,12 +101,13 @@ def _trigger_dag( dag_run = dag.create_dagrun( run_id=run_id, logical_date=logical_date, - state=DagRunState.QUEUED, + data_interval=data_interval, conf=run_conf, + run_type=DagRunType.MANUAL, + triggered_by=triggered_by, external_trigger=True, dag_version=dag_version, - data_interval=data_interval, - triggered_by=triggered_by, + state=DagRunState.QUEUED, session=session, ) diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py index 4574a7b77c76c..0fd7f537a5e40 100644 --- a/airflow/api_connexion/endpoints/dag_run_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py @@ -347,18 +347,17 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse: ) else: data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date) - dag_version = DagVersion.get_latest_version(dag.dag_id) dag_run = dag.create_dagrun( - run_type=DagRunType.MANUAL, run_id=run_id, logical_date=logical_date, data_interval=data_interval, - state=DagRunState.QUEUED, conf=post_body.get("conf"), + run_type=DagRunType.MANUAL, + triggered_by=DagRunTriggeredByType.REST_API, external_trigger=True, - dag_version=dag_version, + dag_version=DagVersion.get_latest_version(dag.dag_id), + state=DagRunState.QUEUED, session=session, - triggered_by=DagRunTriggeredByType.REST_API, ) dag_run_note = post_body.get("note") if dag_run_note: diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 5c87ce145620e..9b8cfd6727bdc 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -342,7 +342,6 @@ def trigger_dag_run( f"DAG with dag_id: '{dag_id}' has import errors and cannot be triggered", ) - run_id = body.dag_run_id logical_date = pendulum.instance(body.logical_date) try: @@ -355,18 +354,27 @@ def trigger_dag_run( ) else: data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date) - dag_version = DagVersion.get_latest_version(dag.dag_id) + + if body.dag_run_id: + run_id = body.dag_run_id + else: + run_id = dag.timetable.generate_run_id( + run_type=DagRunType.MANUAL, + logical_date=logical_date, + data_interval=data_interval, + ) + dag_run = dag.create_dagrun( - run_type=DagRunType.MANUAL, run_id=run_id, logical_date=logical_date, data_interval=data_interval, - state=DagRunState.QUEUED, conf=body.conf, + run_type=DagRunType.MANUAL, + triggered_by=DagRunTriggeredByType.REST_API, external_trigger=True, - dag_version=dag_version, + dag_version=DagVersion.get_latest_version(dag.dag_id), + state=DagRunState.QUEUED, session=session, - triggered_by=DagRunTriggeredByType.REST_API, ) dag_run_note = body.note if dag_run_note: diff --git a/airflow/cli/commands/remote_commands/task_command.py b/airflow/cli/commands/remote_commands/task_command.py index 2329ee25bccaa..51198af74961a 100644 --- a/airflow/cli/commands/remote_commands/task_command.py +++ b/airflow/cli/commands/remote_commands/task_command.py @@ -66,7 +66,7 @@ from airflow.utils.session import NEW_SESSION, create_session, provide_session from airflow.utils.state import DagRunState from airflow.utils.task_instance_session import set_current_task_instance_session -from airflow.utils.types import DagRunTriggeredByType +from airflow.utils.types import DagRunTriggeredByType, DagRunType if TYPE_CHECKING: from typing import Literal @@ -180,12 +180,14 @@ def _get_dag_run( return dag_run, True elif create_if_necessary == "db": dag_run = dag.create_dagrun( - state=DagRunState.QUEUED, - logical_date=dag_run_logical_date, run_id=_generate_temporary_run_id(), + logical_date=dag_run_logical_date, data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date), - session=session, + run_type=DagRunType.MANUAL, triggered_by=DagRunTriggeredByType.CLI, + dag_version=None, + state=DagRunState.QUEUED, + session=session, ) return dag_run, True raise ValueError(f"unknown create_if_necessary value: {create_if_necessary!r}") diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 4718b824830b7..f8a2eb329537b 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1344,15 +1344,19 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) - if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns: try: dag.create_dagrun( - run_type=DagRunType.SCHEDULED, + run_id=dag.timetable.generate_run_id( + run_type=DagRunType.SCHEDULED, + logical_date=dag_model.next_dagrun, + data_interval=data_interval, + ), logical_date=dag_model.next_dagrun, - state=DagRunState.QUEUED, data_interval=data_interval, - external_trigger=False, - session=session, + run_type=DagRunType.SCHEDULED, + triggered_by=DagRunTriggeredByType.TIMETABLE, dag_version=latest_dag_version, + state=DagRunState.QUEUED, creating_job_id=self.job.id, - triggered_by=DagRunTriggeredByType.TIMETABLE, + session=session, ) active_runs_of_dags[dag.dag_id] += 1 # Exceptions like ValueError, ParamValidationError, etc. are raised by @@ -1448,25 +1452,22 @@ def _create_dag_runs_asset_triggered( ).all() data_interval = dag.timetable.data_interval_for_events(logical_date, asset_events) - run_id = dag.timetable.generate_run_id( - run_type=DagRunType.ASSET_TRIGGERED, - logical_date=logical_date, - data_interval=data_interval, - session=session, - events=asset_events, - ) - dag_run = dag.create_dagrun( - run_id=run_id, - run_type=DagRunType.ASSET_TRIGGERED, + run_id=dag.timetable.generate_run_id( + run_type=DagRunType.ASSET_TRIGGERED, + logical_date=logical_date, + data_interval=data_interval, + session=session, + events=asset_events, + ), logical_date=logical_date, data_interval=data_interval, - state=DagRunState.QUEUED, - external_trigger=False, - session=session, + run_type=DagRunType.ASSET_TRIGGERED, + triggered_by=DagRunTriggeredByType.ASSET, dag_version=latest_dag_version, + state=DagRunState.QUEUED, creating_job_id=self.job.id, - triggered_by=DagRunTriggeredByType.ASSET, + session=session, ) Stats.incr("asset.triggered_dagruns") dag_run.consumed_asset_events.extend(asset_events) diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py index 39a28379a6526..3003b1a534782 100644 --- a/airflow/models/backfill.py +++ b/airflow/models/backfill.py @@ -54,6 +54,8 @@ if TYPE_CHECKING: from datetime import datetime + from airflow.models.dag import DAG + from airflow.timetables.base import DagRunInfo log = logging.getLogger(__name__) @@ -223,8 +225,8 @@ def _do_dry_run(*, dag_id, from_date, to_date, reverse, reprocess_behavior, sess def _create_backfill_dag_run( *, - dag, - info, + dag: DAG, + info: DagRunInfo, reprocess_behavior: ReprocessBehavior, backfill_id, dag_run_conf, @@ -257,18 +259,21 @@ def _create_backfill_dag_run( return dag_version = DagVersion.get_latest_version(dag.dag_id, session=session) dr = dag.create_dagrun( - triggered_by=DagRunTriggeredByType.BACKFILL, + run_id=dag.timetable.generate_run_id( + run_type=DagRunType.BACKFILL_JOB, + logical_date=info.logical_date, + data_interval=info.data_interval, + ), logical_date=info.logical_date, data_interval=info.data_interval, - start_date=timezone.utcnow(), - state=DagRunState.QUEUED, - external_trigger=False, conf=dag_run_conf, run_type=DagRunType.BACKFILL_JOB, - creating_job_id=None, - session=session, - backfill_id=backfill_id, + triggered_by=DagRunTriggeredByType.BACKFILL, dag_version=dag_version, + state=DagRunState.QUEUED, + start_date=timezone.utcnow(), + backfill_id=backfill_id, + session=session, ) session.add( BackfillDagRun( diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 0f7d4d4925c32..9c3451dc63777 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -246,31 +246,24 @@ def _triggerer_is_healthy(): @provide_session def _create_orm_dagrun( - dag, - dag_id, - run_id, - logical_date, - start_date, - external_trigger, - conf, - state, - run_type, - dag_version, - creating_job_id, - data_interval, - backfill_id, - session, - triggered_by, -): - bundle_version = session.scalar( - select( - DagModel.latest_bundle_version, - ).where( - DagModel.dag_id == dag.dag_id, - ) - ) + *, + dag: DAG, + run_id: str, + logical_date: datetime | None, + data_interval: DataInterval | None, + start_date: datetime | None, + external_trigger: bool, + conf: Any, + state: DagRunState | None, + run_type: DagRunType, + dag_version: DagVersion | None, + creating_job_id: int | None, + backfill_id: int | None, + triggered_by: DagRunTriggeredByType, + session: Session = NEW_SESSION, +) -> DagRun: run = DagRun( - dag_id=dag_id, + dag_id=dag.dag_id, run_id=run_id, logical_date=logical_date, start_date=start_date, @@ -283,7 +276,9 @@ def _create_orm_dagrun( data_interval=data_interval, triggered_by=triggered_by, backfill_id=backfill_id, - bundle_version=bundle_version, + bundle_version=session.scalar( + select(DagModel.latest_bundle_version).where(DagModel.dag_id == dag.dag_id) + ), ) # Load defaults into the following two fields to ensure result can be serialized detached run.log_template_id = int(session.scalar(select(func.max(LogTemplate.__table__.c.id)))) @@ -1735,86 +1730,63 @@ def add_logger_if_needed(ti: TaskInstance): @provide_session def create_dagrun( self, - state: DagRunState, *, - triggered_by: DagRunTriggeredByType | None, - logical_date: datetime | None = None, - run_id: str | None = None, - start_date: datetime | None = None, - external_trigger: bool | None = False, + run_id: str, + logical_date: datetime, + data_interval: tuple[datetime, datetime], conf: dict | None = None, - run_type: DagRunType | None = None, - session: Session = NEW_SESSION, + run_type: DagRunType, + triggered_by: DagRunTriggeredByType, + external_trigger: bool = False, dag_version: DagVersion | None = None, + state: DagRunState, + start_date: datetime | None = None, creating_job_id: int | None = None, - data_interval: tuple[datetime, datetime] | None = None, backfill_id: int | None = None, - ): + session: Session = NEW_SESSION, + ) -> DagRun: """ - Create a dag run from this dag including the tasks associated with this dag. + Create a run for this DAG to run its tasks. - Returns the dag run. - - :param state: the state of the dag run - :param triggered_by: The entity which triggers the DagRun - :param run_id: defines the run id for this dag run - :param run_type: type of DagRun - :param logical_date: the logical date of this dag run :param start_date: the date this dag run should be evaluated - :param external_trigger: whether this dag run is externally triggered :param conf: Dict containing configuration/parameters to pass to the DAG - :param creating_job_id: id of the job creating this DagRun - :param session: database session - :param dag_version: The DagVersion object for this run - :param data_interval: Data interval of the DagRun - :param backfill_id: id of the backfill run if one exists + :param creating_job_id: ID of the job creating this DagRun + :param backfill_id: ID of the backfill run if one exists + :return: The created DAG run. + + :meta private: """ logical_date = timezone.coerce_datetime(logical_date) if data_interval and not isinstance(data_interval, DataInterval): data_interval = DataInterval(*map(timezone.coerce_datetime, data_interval)) - if data_interval is None and logical_date is not None: - raise ValueError( - "Calling `DAG.create_dagrun()` without an explicit data interval is not supported." - ) - - if run_type is None or isinstance(run_type, DagRunType): + if isinstance(run_type, DagRunType): pass - elif isinstance(run_type, str): # Compatibility: run_type used to be a str. + elif isinstance(run_type, str): # Ensure the input value is valid. run_type = DagRunType(run_type) else: - raise ValueError(f"`run_type` should be a DagRunType, not {type(run_type)}") - - if run_id: # Infer run_type from run_id if needed. - if not isinstance(run_id, str): - raise ValueError(f"`run_id` should be a str, not {type(run_id)}") - inferred_run_type = DagRunType.from_run_id(run_id) - if run_type is None: - # No explicit type given, use the inferred type. - run_type = inferred_run_type - elif run_type == DagRunType.MANUAL and inferred_run_type != DagRunType.MANUAL: - # Prevent a manual run from using an ID that looks like a scheduled run. + raise ValueError(f"run_type should be a DagRunType, not {type(run_type)}") + + if not isinstance(run_id, str): + raise ValueError(f"`run_id` should be a str, not {type(run_id)}") + + # This is also done on the DagRun model class, but SQLAlchemy column + # validator does not work well for some reason. + if not re2.match(RUN_ID_REGEX, run_id): + regex = airflow_conf.get("scheduler", "allowed_run_id_pattern").strip() + if not regex or not re2.match(regex, run_id): raise ValueError( - f"A {run_type.value} DAG run cannot use ID {run_id!r} since it " - f"is reserved for {inferred_run_type.value} runs" + f"The run_id provided '{run_id}' does not match regex pattern " + f"'{regex}' or '{RUN_ID_REGEX}'" ) - elif run_type and logical_date is not None: # Generate run_id from run_type and logical_date. - run_id = self.timetable.generate_run_id( - run_type=run_type, logical_date=logical_date, data_interval=data_interval - ) - else: - raise AirflowException( - "Creating DagRun needs either `run_id` or both `run_type` and `logical_date`" - ) - - regex = airflow_conf.get("scheduler", "allowed_run_id_pattern") - if run_id and not re2.match(RUN_ID_REGEX, run_id): - if not regex.strip() or not re2.match(regex.strip(), run_id): - raise AirflowException( - f"The provided run ID '{run_id}' is invalid. It does not match either " - f"the configured pattern: '{regex}' or the built-in pattern: '{RUN_ID_REGEX}'" + # Prevent a manual run from using an ID that looks like a scheduled run. + if run_type == DagRunType.MANUAL: + if (inferred_run_type := DagRunType.from_run_id(run_id)) != DagRunType.MANUAL: + raise ValueError( + f"A {run_type.value} DAG run cannot use ID {run_id!r} since it " + f"is reserved for {inferred_run_type.value} runs" ) # todo: AIP-78 add verification that if run type is backfill then we have a backfill id @@ -1824,15 +1796,15 @@ def create_dagrun( assert self.params # create a copy of params before validating copied_params = copy.deepcopy(self.params) - copied_params.update(conf or {}) + if conf: + copied_params.update(conf) copied_params.validate() - run = _create_orm_dagrun( + return _create_orm_dagrun( dag=self, - dag_id=self.dag_id, run_id=run_id, logical_date=logical_date, - start_date=start_date, + start_date=timezone.coerce_datetime(start_date), external_trigger=external_trigger, conf=conf, state=state, @@ -1841,10 +1813,9 @@ def create_dagrun( creating_job_id=creating_job_id, backfill_id=backfill_id, data_interval=data_interval, - session=session, triggered_by=triggered_by, + session=session, ) - return run @classmethod @provide_session @@ -2487,14 +2458,15 @@ def _run_task( def _get_or_create_dagrun( + *, dag: DAG, - conf: dict[Any, Any] | None, - start_date: datetime, - logical_date: datetime, run_id: str, - session: Session, + logical_date: datetime, + data_interval: tuple[datetime, datetime], + conf: dict | None, triggered_by: DagRunTriggeredByType, - data_interval: tuple[datetime, datetime] | None = None, + start_date: datetime, + session: Session, ) -> DagRun: """ Create a DAG run, replacing an existing instance if needed to prevent collisions. @@ -2510,24 +2482,23 @@ def _get_or_create_dagrun( :return: The newly created DAG run. """ - log.info("dagrun id: %s", dag.dag_id) dr: DagRun = session.scalar( select(DagRun).where(DagRun.dag_id == dag.dag_id, DagRun.logical_date == logical_date) ) if dr: session.delete(dr) session.commit() - dag_version = DagVersion.get_latest_version(dag.dag_id, session=session) dr = dag.create_dagrun( - state=DagRunState.RUNNING, - logical_date=logical_date, run_id=run_id, - start_date=start_date or logical_date, - session=session, - conf=conf, + logical_date=logical_date, data_interval=data_interval, + conf=conf, + run_type=DagRunType.MANUAL, + state=DagRunState.RUNNING, triggered_by=triggered_by, - dag_version=dag_version, + dag_version=DagVersion.get_latest_version(dag.dag_id, session=session), + start_date=start_date or logical_date, + session=session, ) log.info("created dagrun %s", dr) return dr diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index e7aad18672d5c..8060901f948a3 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -278,12 +278,14 @@ def __repr__(self): def validate_run_id(self, key: str, run_id: str) -> str | None: if not run_id: return None - regex = airflow_conf.get("scheduler", "allowed_run_id_pattern") - if not re2.match(regex, run_id) and not re2.match(RUN_ID_REGEX, run_id): - raise ValueError( - f"The run_id provided '{run_id}' does not match the pattern '{regex}' or '{RUN_ID_REGEX}'" - ) - return run_id + if re2.match(RUN_ID_REGEX, run_id): + return run_id + regex = airflow_conf.get("scheduler", "allowed_run_id_pattern").strip() + if regex and re2.match(regex, run_id): + return run_id + raise ValueError( + f"The run_id provided '{run_id}' does not match regex pattern '{regex}' or '{RUN_ID_REGEX}'" + ) @property def stats_tags(self) -> dict[str, str]: diff --git a/airflow/www/views.py b/airflow/www/views.py index 01b10b98aff64..d2751d18046d4 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -2221,18 +2221,26 @@ def trigger(self, dag_id: str, session: Session = NEW_SESSION): "warning", ) + data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date) + if not run_id: + run_id = dag.timetable.generate_run_id( + logical_date=logical_date, + data_interval=data_interval, + run_type=DagRunType.MANUAL, + ) + try: - dag_version = DagVersion.get_latest_version(dag.dag_id) dag_run = dag.create_dagrun( - run_type=DagRunType.MANUAL, + run_id=run_id, logical_date=logical_date, - data_interval=dag.timetable.infer_manual_data_interval(run_after=logical_date), - state=DagRunState.QUEUED, + data_interval=data_interval, conf=run_conf, - external_trigger=True, - dag_version=dag_version, - run_id=run_id, + run_type=DagRunType.MANUAL, triggered_by=DagRunTriggeredByType.UI, + external_trigger=True, + dag_version=DagVersion.get_latest_version(dag.dag_id), + state=DagRunState.QUEUED, + session=session, ) except (ValueError, ParamValidationError) as ve: flash(f"{ve}", "error") diff --git a/dev/perf/scheduler_dag_execution_timing.py b/dev/perf/scheduler_dag_execution_timing.py index f8760f4c03c8a..3c1fbd0395c05 100755 --- a/dev/perf/scheduler_dag_execution_timing.py +++ b/dev/perf/scheduler_dag_execution_timing.py @@ -172,11 +172,14 @@ def create_dag_runs(dag, num_runs, session): dag.create_dagrun( run_id=f"{id_prefix}{logical_date.isoformat()}", logical_date=logical_date, - start_date=timezone.utcnow(), - state=DagRunState.RUNNING, + data_interval=(logical_date, logical_date), + run_type=DagRunType.MANUAL, + triggered_by=DagRunTriggeredByType.TEST, external_trigger=False, + dag_version=None, + state=DagRunState.RUNNING, + start_date=timezone.utcnow(), session=session, - triggered_by=DagRunTriggeredByType.TEST, ) last_dagrun_data_interval = next_info.data_interval @@ -292,7 +295,8 @@ def main(num_runs, repeat, pre_create_dag_runs, executor_class, dag_ids): # Need a lambda to refer to the _latest_ value for scheduler_job, not just # the initial one - code_to_test = lambda: run_job(job=job_runner.job, execute_callable=job_runner._execute) + def code_to_test(): + run_job(job=job_runner.job, execute_callable=job_runner._execute) for count in range(repeat): if not count: diff --git a/docs/apache-airflow/best-practices.rst b/docs/apache-airflow/best-practices.rst index 0f4b77001f20b..37092f31a6d7b 100644 --- a/docs/apache-airflow/best-practices.rst +++ b/docs/apache-airflow/best-practices.rst @@ -725,13 +725,14 @@ This is an example test want to verify the structure of a code-generated DAG aga from airflow import DAG from airflow.utils.state import DagRunState, TaskInstanceState - from airflow.utils.types import DagRunType + from airflow.utils.types import DagRunTriggeredByType, DagRunType DATA_INTERVAL_START = pendulum.datetime(2021, 9, 13, tz="UTC") DATA_INTERVAL_END = DATA_INTERVAL_START + datetime.timedelta(days=1) TEST_DAG_ID = "my_custom_operator_dag" TEST_TASK_ID = "my_custom_operator_task" + TEST_RUN_ID = "my_custom_operator_dag_run" @pytest.fixture() @@ -750,11 +751,13 @@ This is an example test want to verify the structure of a code-generated DAG aga def test_my_custom_operator_execute_no_trigger(dag): dagrun = dag.create_dagrun( - state=DagRunState.RUNNING, - execution_date=DATA_INTERVAL_START, + run_id=TEST_RUN_ID, + logical_date=DATA_INTERVAL_START, data_interval=(DATA_INTERVAL_START, DATA_INTERVAL_END), - start_date=DATA_INTERVAL_END, run_type=DagRunType.MANUAL, + triggered_by=DagRunTriggeredByType.TIMETABLE, + state=DagRunState.RUNNING, + start_date=DATA_INTERVAL_END, ) ti = dagrun.get_task_instance(task_id=TEST_TASK_ID) ti.task = dag.get_task(task_id=TEST_TASK_ID) diff --git a/providers/tests/cncf/kubernetes/log_handlers/test_log_handlers.py b/providers/tests/cncf/kubernetes/log_handlers/test_log_handlers.py index f18a4b88eedbe..9cbebcf8df9ec 100644 --- a/providers/tests/cncf/kubernetes/log_handlers/test_log_handlers.py +++ b/providers/tests/cncf/kubernetes/log_handlers/test_log_handlers.py @@ -122,23 +122,21 @@ def task_callable(ti): python_callable=task_callable, executor_config={"pod_override": pod_override}, ) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} + if AIRFLOW_V_3_0_PLUS: - dagrun = dag.create_dagrun( - run_type=DagRunType.MANUAL, - state=State.RUNNING, - logical_date=DEFAULT_DATE, - data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE), - **triggered_by_kwargs, - ) + dagrun_kwargs: dict = { + "logical_date": DEFAULT_DATE, + "triggered_by": DagRunTriggeredByType.TEST, + } else: - dagrun = dag.create_dagrun( - run_type=DagRunType.MANUAL, - state=State.RUNNING, - execution_date=DEFAULT_DATE, - data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE), - **triggered_by_kwargs, - ) + dagrun_kwargs = {"execution_date": DEFAULT_DATE} + dagrun = dag.create_dagrun( + run_id="test", + run_type=DagRunType.MANUAL, + state=State.RUNNING, + data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE), + **dagrun_kwargs, + ) ti = TaskInstance(task=task, run_id=dagrun.run_id) ti.try_number = 3 @@ -160,7 +158,7 @@ def task_callable(ti): ( "dag_id=dag_for_testing_file_task_handler," "kubernetes_executor=True," - "run_id=manual__2016-01-01T0000000000-2b88d1d57," + "run_id=test," "task_id=task_for_testing_file_log_handler," "try_number=2," "airflow-worker" diff --git a/providers/tests/common/sql/operators/test_sql.py b/providers/tests/common/sql/operators/test_sql.py index 133d51ac75753..b0eedb52c47c2 100644 --- a/providers/tests/common/sql/operators/test_sql.py +++ b/providers/tests/common/sql/operators/test_sql.py @@ -44,6 +44,7 @@ from airflow.utils import timezone from airflow.utils.session import create_session from airflow.utils.state import State +from airflow.utils.types import DagRunType from tests_common.test_utils.providers import get_provider_min_airflow_version from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS @@ -1158,26 +1159,23 @@ def test_branch_single_value_with_dag_run(self, mock_get_db_hook): self.branch_1.set_upstream(branch_op) self.branch_2.set_upstream(branch_op) self.dag.clear() - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} if AIRFLOW_V_3_0_PLUS: - dr = self.dag.create_dagrun( - run_id="manual__", - start_date=timezone.utcnow(), - logical_date=DEFAULT_DATE, - state=State.RUNNING, - data_interval=(DEFAULT_DATE, DEFAULT_DATE), - **triggered_by_kwargs, - ) + dagrun_kwargs = { + "logical_date": DEFAULT_DATE, + "triggered_by": DagRunTriggeredByType.TEST, + } else: - dr = self.dag.create_dagrun( - run_id="manual__", - start_date=timezone.utcnow(), - execution_date=DEFAULT_DATE, - state=State.RUNNING, - data_interval=(DEFAULT_DATE, DEFAULT_DATE), - **triggered_by_kwargs, - ) + dagrun_kwargs = {"execution_date": DEFAULT_DATE} + + dr = self.dag.create_dagrun( + run_id="manual__", + run_type=DagRunType.MANUAL, + start_date=timezone.utcnow(), + state=State.RUNNING, + data_interval=(DEFAULT_DATE, DEFAULT_DATE), + **dagrun_kwargs, + ) mock_get_records = mock_get_db_hook.return_value.get_first @@ -1211,25 +1209,22 @@ def test_branch_true_with_dag_run(self, mock_get_db_hook): self.branch_1.set_upstream(branch_op) self.branch_2.set_upstream(branch_op) self.dag.clear() - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} + if AIRFLOW_V_3_0_PLUS: - dr = self.dag.create_dagrun( - run_id="manual__", - start_date=timezone.utcnow(), - logical_date=DEFAULT_DATE, - state=State.RUNNING, - data_interval=(DEFAULT_DATE, DEFAULT_DATE), - **triggered_by_kwargs, - ) + dagrun_kwargs = { + "logical_date": DEFAULT_DATE, + "triggered_by": DagRunTriggeredByType.TEST, + } else: - dr = self.dag.create_dagrun( - run_id="manual__", - start_date=timezone.utcnow(), - execution_date=DEFAULT_DATE, - state=State.RUNNING, - data_interval=(DEFAULT_DATE, DEFAULT_DATE), - **triggered_by_kwargs, - ) + dagrun_kwargs = {"execution_date": DEFAULT_DATE} + dr = self.dag.create_dagrun( + run_id="manual__", + run_type=DagRunType.MANUAL, + start_date=timezone.utcnow(), + state=State.RUNNING, + data_interval=(DEFAULT_DATE, DEFAULT_DATE), + **dagrun_kwargs, + ) mock_get_records = mock_get_db_hook.return_value.get_first @@ -1264,26 +1259,22 @@ def test_branch_false_with_dag_run(self, mock_get_db_hook): self.branch_1.set_upstream(branch_op) self.branch_2.set_upstream(branch_op) self.dag.clear() - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} if AIRFLOW_V_3_0_PLUS: - dr = self.dag.create_dagrun( - run_id="manual__", - start_date=timezone.utcnow(), - logical_date=DEFAULT_DATE, - state=State.RUNNING, - data_interval=(DEFAULT_DATE, DEFAULT_DATE), - **triggered_by_kwargs, - ) + dagrun_kwargs = { + "logical_date": DEFAULT_DATE, + "triggered_by": DagRunTriggeredByType.TEST, + } else: - dr = self.dag.create_dagrun( - run_id="manual__", - start_date=timezone.utcnow(), - execution_date=DEFAULT_DATE, - state=State.RUNNING, - data_interval=(DEFAULT_DATE, DEFAULT_DATE), - **triggered_by_kwargs, - ) + dagrun_kwargs = {"execution_date": DEFAULT_DATE} + dr = self.dag.create_dagrun( + run_id="manual__", + run_type=DagRunType.MANUAL, + start_date=timezone.utcnow(), + state=State.RUNNING, + data_interval=(DEFAULT_DATE, DEFAULT_DATE), + **dagrun_kwargs, + ) mock_get_records = mock_get_db_hook.return_value.get_first @@ -1319,26 +1310,22 @@ def test_branch_list_with_dag_run(self, mock_get_db_hook): self.branch_3 = EmptyOperator(task_id="branch_3", dag=self.dag) self.branch_3.set_upstream(branch_op) self.dag.clear() - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} if AIRFLOW_V_3_0_PLUS: - dr = self.dag.create_dagrun( - run_id="manual__", - start_date=timezone.utcnow(), - logical_date=DEFAULT_DATE, - state=State.RUNNING, - data_interval=(DEFAULT_DATE, DEFAULT_DATE), - **triggered_by_kwargs, - ) + dagrun_kwargs = { + "logical_date": DEFAULT_DATE, + "triggered_by": DagRunTriggeredByType.TEST, + } else: - dr = self.dag.create_dagrun( - run_id="manual__", - start_date=timezone.utcnow(), - execution_date=DEFAULT_DATE, - state=State.RUNNING, - data_interval=(DEFAULT_DATE, DEFAULT_DATE), - **triggered_by_kwargs, - ) + dagrun_kwargs = {"execution_date": DEFAULT_DATE} + dr = self.dag.create_dagrun( + run_id="manual__", + run_type=DagRunType.MANUAL, + start_date=timezone.utcnow(), + state=State.RUNNING, + data_interval=(DEFAULT_DATE, DEFAULT_DATE), + **dagrun_kwargs, + ) mock_get_records = mock_get_db_hook.return_value.get_first mock_get_records.return_value = [["1"]] @@ -1371,26 +1358,22 @@ def test_invalid_query_result_with_dag_run(self, mock_get_db_hook): self.branch_1.set_upstream(branch_op) self.branch_2.set_upstream(branch_op) self.dag.clear() - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} if AIRFLOW_V_3_0_PLUS: - self.dag.create_dagrun( - run_id="manual__", - start_date=timezone.utcnow(), - logical_date=DEFAULT_DATE, - state=State.RUNNING, - data_interval=(DEFAULT_DATE, DEFAULT_DATE), - **triggered_by_kwargs, - ) + dagrun_kwargs = { + "logical_date": DEFAULT_DATE, + "triggered_by": DagRunTriggeredByType.TEST, + } else: - self.dag.create_dagrun( - run_id="manual__", - start_date=timezone.utcnow(), - execution_date=DEFAULT_DATE, - state=State.RUNNING, - data_interval=(DEFAULT_DATE, DEFAULT_DATE), - **triggered_by_kwargs, - ) + dagrun_kwargs = {"execution_date": DEFAULT_DATE} + self.dag.create_dagrun( + run_id="manual__", + run_type=DagRunType.MANUAL, + start_date=timezone.utcnow(), + state=State.RUNNING, + data_interval=(DEFAULT_DATE, DEFAULT_DATE), + **dagrun_kwargs, + ) mock_get_records = mock_get_db_hook.return_value.get_first @@ -1414,25 +1397,22 @@ def test_with_skip_in_branch_downstream_dependencies(self, mock_get_db_hook): branch_op >> self.branch_1 >> self.branch_2 branch_op >> self.branch_2 self.dag.clear() - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} + if AIRFLOW_V_3_0_PLUS: - dr = self.dag.create_dagrun( - run_id="manual__", - start_date=timezone.utcnow(), - logical_date=DEFAULT_DATE, - state=State.RUNNING, - data_interval=(DEFAULT_DATE, DEFAULT_DATE), - **triggered_by_kwargs, - ) + dagrun_kwargs = { + "logical_date": DEFAULT_DATE, + "triggered_by": DagRunTriggeredByType.TEST, + } else: - dr = self.dag.create_dagrun( - run_id="manual__", - start_date=timezone.utcnow(), - execution_date=DEFAULT_DATE, - state=State.RUNNING, - data_interval=(DEFAULT_DATE, DEFAULT_DATE), - **triggered_by_kwargs, - ) + dagrun_kwargs = {"execution_date": DEFAULT_DATE} + dr = self.dag.create_dagrun( + run_id="manual__", + run_type=DagRunType.MANUAL, + start_date=timezone.utcnow(), + state=State.RUNNING, + data_interval=(DEFAULT_DATE, DEFAULT_DATE), + **dagrun_kwargs, + ) mock_get_records = mock_get_db_hook.return_value.get_first @@ -1465,25 +1445,22 @@ def test_with_skip_in_branch_downstream_dependencies2(self, mock_get_db_hook): branch_op >> self.branch_1 >> self.branch_2 branch_op >> self.branch_2 self.dag.clear() - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} + if AIRFLOW_V_3_0_PLUS: - dr = self.dag.create_dagrun( - run_id="manual__", - start_date=timezone.utcnow(), - logical_date=DEFAULT_DATE, - state=State.RUNNING, - data_interval=(DEFAULT_DATE, DEFAULT_DATE), - **triggered_by_kwargs, - ) + dagrun_kwargs = { + "logical_date": DEFAULT_DATE, + "triggered_by": DagRunTriggeredByType.TEST, + } else: - dr = self.dag.create_dagrun( - run_id="manual__", - start_date=timezone.utcnow(), - execution_date=DEFAULT_DATE, - state=State.RUNNING, - data_interval=(DEFAULT_DATE, DEFAULT_DATE), - **triggered_by_kwargs, - ) + dagrun_kwargs = {"execution_date": DEFAULT_DATE} + dr = self.dag.create_dagrun( + run_id="manual__", + run_type=DagRunType.MANUAL, + start_date=timezone.utcnow(), + state=State.RUNNING, + data_interval=(DEFAULT_DATE, DEFAULT_DATE), + **dagrun_kwargs, + ) mock_get_records = mock_get_db_hook.return_value.get_first diff --git a/providers/tests/openlineage/plugins/test_execution.py b/providers/tests/openlineage/plugins/test_execution.py index 039064e7053f6..6eb84a5d4012d 100644 --- a/providers/tests/openlineage/plugins/test_execution.py +++ b/providers/tests/openlineage/plugins/test_execution.py @@ -34,8 +34,10 @@ from airflow.providers.openlineage.plugins.listener import OpenLineageListener from airflow.utils import timezone from airflow.utils.state import State +from airflow.utils.types import DagRunType from tests_common.test_utils.config import conf_vars +from tests_common.test_utils.db import clear_db_runs from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS, AIRFLOW_V_3_0_PLUS if AIRFLOW_V_3_0_PLUS: @@ -72,6 +74,9 @@ def has_value_in_events(events, chain, value): @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Test requires Airflow 2.10+") @pytest.mark.usefixtures("reset_logging_config") class TestOpenLineageExecution: + def teardown_method(self): + clear_db_runs() + @pytest.fixture(autouse=True) def clean_listener_manager(self): get_listener_manager().clear() @@ -93,13 +98,20 @@ def setup_job(self, task_name, run_id): dag = dagbag.dags.get("test_openlineage_execution") task = dag.get_task(task_name) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} + if AIRFLOW_V_3_0_PLUS: + dagrun_kwargs = { + "logical_date": DEFAULT_DATE, + "triggered_by": DagRunTriggeredByType.TEST, + } + else: + dagrun_kwargs = {"execution_date": DEFAULT_DATE} dag.create_dagrun( run_id=run_id, + run_type=DagRunType.MANUAL, data_interval=(DEFAULT_DATE, DEFAULT_DATE), state=State.RUNNING, start_date=DEFAULT_DATE, - **triggered_by_kwargs, + **dagrun_kwargs, ) ti = TaskInstance(task=task, run_id=run_id) job = Job(id=random.randint(0, 23478197), dag_id=ti.dag_id) @@ -192,13 +204,20 @@ def test_success_overtime_kills_tasks(self): dag = dagbag.dags.get("test_openlineage_execution") task = dag.get_task("execute_long_stall") - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} + if AIRFLOW_V_3_0_PLUS: + dagrun_kwargs = { + "logical_date": DEFAULT_DATE, + "triggered_by": DagRunTriggeredByType.TEST, + } + else: + dagrun_kwargs = {"execution_date": DEFAULT_DATE} dag.create_dagrun( run_id="test_long_stalled_task_is_killed_by_listener_overtime_if_ol_timeout_long_enough", + run_type=DagRunType.MANUAL, data_interval=(DEFAULT_DATE, DEFAULT_DATE), state=State.RUNNING, start_date=DEFAULT_DATE, - **triggered_by_kwargs, + **dagrun_kwargs, ) ti = TaskInstance( task=task, diff --git a/providers/tests/openlineage/plugins/test_listener.py b/providers/tests/openlineage/plugins/test_listener.py index eeca49c7f8d1b..837873f439d24 100644 --- a/providers/tests/openlineage/plugins/test_listener.py +++ b/providers/tests/openlineage/plugins/test_listener.py @@ -36,15 +36,14 @@ from airflow.providers.openlineage.plugins.facets import AirflowDebugRunFacet from airflow.providers.openlineage.plugins.listener import OpenLineageListener from airflow.providers.openlineage.utils.selective_enable import disable_lineage, enable_lineage +from airflow.utils import types from airflow.utils.state import DagRunState, State from tests_common.test_utils.compat import PythonOperator from tests_common.test_utils.config import conf_vars +from tests_common.test_utils.db import clear_db_runs from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS, AIRFLOW_V_3_0_PLUS -if AIRFLOW_V_3_0_PLUS: - from airflow.utils.types import DagRunTriggeredByType - pytestmark = pytest.mark.db_test EXPECTED_TRY_NUMBER_1 = 1 if AIRFLOW_V_2_10_PLUS else 0 @@ -80,20 +79,30 @@ def regular_call(self, callable, callable_name, use_fork): def test_listener_does_not_change_task_instance(render_mock, xcom_push_mock): render_mock.return_value = render_df() + date = dt.datetime(2022, 1, 1) dag = DAG( "test", schedule=None, - start_date=dt.datetime(2022, 1, 1), + start_date=date, user_defined_macros={"render_df": render_df}, params={"df": {"col": [1, 2]}}, ) t = TemplateOperator(task_id="template_op", dag=dag, do_xcom_push=True, df=dag.param("df")) run_id = str(uuid.uuid1()) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} + if AIRFLOW_V_3_0_PLUS: + dagrun_kwargs = { + "dag_version": None, + "logical_date": date, + "triggered_by": types.DagRunTriggeredByType.TEST, + } + else: + dagrun_kwargs = {"execution_date": date} dag.create_dagrun( - state=State.NONE, run_id=run_id, - **triggered_by_kwargs, + data_interval=(date, date), + run_type=types.DagRunType.MANUAL, + state=DagRunState.QUEUED, + **dagrun_kwargs, ) ti = TaskInstance(t, run_id=run_id) ti.check_and_change_state_before_execution() # make listener hook on running event @@ -146,8 +155,9 @@ def _create_test_dag_and_task(python_callable: Callable, scenario_name: str) -> :return: TaskInstance: The created TaskInstance object. - This function creates a DAG and a PythonOperator task with the provided python_callable. It generates a unique - run ID and creates a DAG run. This setup is useful for testing different scenarios in Airflow tasks. + This function creates a DAG and a PythonOperator task with the provided + python_callable. It generates a unique run ID and creates a DAG run. This + setup is useful for testing different scenarios in Airflow tasks. :Example: @@ -157,18 +167,28 @@ def sample_callable(**kwargs): task_instance = _create_test_dag_and_task(sample_callable, "sample_scenario") # Use task_instance to simulate running a task in a test. """ + date = dt.datetime(2022, 1, 1) dag = DAG( f"test_{scenario_name}", schedule=None, - start_date=dt.datetime(2022, 1, 1), + start_date=date, ) t = PythonOperator(task_id=f"test_task_{scenario_name}", dag=dag, python_callable=python_callable) run_id = str(uuid.uuid1()) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} + if AIRFLOW_V_3_0_PLUS: + dagrun_kwargs: dict = { + "dag_version": None, + "logical_date": date, + "triggered_by": types.DagRunTriggeredByType.TEST, + } + else: + dagrun_kwargs = {"execution_date": date} dagrun = dag.create_dagrun( - state=State.NONE, # type: ignore run_id=run_id, - **triggered_by_kwargs, # type: ignore + data_interval=(date, date), + run_type=types.DagRunType.MANUAL, + state=DagRunState.QUEUED, + **dagrun_kwargs, ) task_instance = TaskInstance(t, run_id=run_id) return dagrun, task_instance @@ -669,10 +689,11 @@ def set_result(*args, **kwargs): class TestOpenLineageSelectiveEnable: def setup_method(self): + date = dt.datetime(2022, 1, 1) self.dag = DAG( "test_selective_enable", schedule=None, - start_date=dt.datetime(2022, 1, 1), + start_date=date, ) def simple_callable(**kwargs): @@ -685,16 +706,28 @@ def simple_callable(**kwargs): task_id="test_task_selective_enable_2", dag=self.dag, python_callable=simple_callable ) run_id = str(uuid.uuid1()) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} + if AIRFLOW_V_3_0_PLUS: + dagrun_kwargs = { + "dag_version": None, + "logical_date": date, + "triggered_by": types.DagRunTriggeredByType.TEST, + } + else: + dagrun_kwargs = {"execution_date": date} self.dagrun = self.dag.create_dagrun( - state=State.NONE, run_id=run_id, - **triggered_by_kwargs, + data_interval=(date, date), + run_type=types.DagRunType.MANUAL, + state=DagRunState.QUEUED, + **dagrun_kwargs, ) # type: ignore self.task_instance_1 = TaskInstance(self.task_1, run_id=run_id, map_index=-1) self.task_instance_2 = TaskInstance(self.task_2, run_id=run_id, map_index=-1) self.task_instance_1.dag_run = self.task_instance_2.dag_run = self.dagrun + def teardown_method(self): + clear_db_runs() + @pytest.mark.parametrize( "selective_enable, enable_dag, expected_call_count", [ diff --git a/providers/tests/openlineage/plugins/test_utils.py b/providers/tests/openlineage/plugins/test_utils.py index 490061db4f594..36b8057cf01f6 100644 --- a/providers/tests/openlineage/plugins/test_utils.py +++ b/providers/tests/openlineage/plugins/test_utils.py @@ -47,6 +47,7 @@ from airflow.utils import timezone from airflow.utils.log.secrets_masker import _secrets_masker from airflow.utils.state import State +from airflow.utils.types import DagRunType from tests_common.test_utils.compat import ( BashOperator, @@ -100,12 +101,20 @@ def test_get_dagrun_start_end(dag_maker): dag_model = DagModel.get_dagmodel(dag.dag_id) run_id = str(uuid.uuid1()) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} + data_interval = dag.get_next_data_interval(dag_model) + if AIRFLOW_V_3_0_PLUS: + dagrun_kwargs = { + "logical_date": data_interval.start, + "triggered_by": DagRunTriggeredByType.TEST, + } + else: + dagrun_kwargs = {"execution_date": data_interval.start} dagrun = dag.create_dagrun( state=State.NONE, run_id=run_id, - data_interval=dag.get_next_data_interval(dag_model), - **triggered_by_kwargs, + run_type=DagRunType.MANUAL, + data_interval=data_interval, + **dagrun_kwargs, ) assert dagrun.data_interval_start is not None start_date_tz = datetime.datetime(2022, 1, 1, tzinfo=timezone.utc) diff --git a/tests/cli/commands/remote_commands/test_task_command.py b/tests/cli/commands/remote_commands/test_task_command.py index c1e6b6b23d7cd..06a477e283168 100644 --- a/tests/cli/commands/remote_commands/test_task_command.py +++ b/tests/cli/commands/remote_commands/test_task_command.py @@ -104,14 +104,21 @@ def setup_class(cls): cls.dagbag = DagBag(read_dags_from_db=True) cls.dag = cls.dagbag.get_dag(cls.dag_id) data_interval = cls.dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.CLI} if AIRFLOW_V_3_0_PLUS else {} + v3_kwargs = ( + { + "dag_version": None, + "triggered_by": DagRunTriggeredByType.TEST, + } + if AIRFLOW_V_3_0_PLUS + else {} + ) cls.dag_run = cls.dag.create_dagrun( state=State.NONE, run_id=cls.run_id, run_type=DagRunType.MANUAL, logical_date=DEFAULT_DATE, data_interval=data_interval, - **triggered_by_kwargs, + **v3_kwargs, ) @classmethod @@ -168,7 +175,14 @@ def test_cli_test_different_path(self, session, tmp_path): logical_date = pendulum.now("UTC") data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} + v3_kwargs = ( + { + "dag_version": None, + "triggered_by": DagRunTriggeredByType.TEST, + } + if AIRFLOW_V_3_0_PLUS + else {} + ) dag.create_dagrun( state=State.NONE, run_id="abc123", @@ -176,7 +190,7 @@ def test_cli_test_different_path(self, session, tmp_path): logical_date=logical_date, data_interval=data_interval, session=session, - **triggered_by_kwargs, + **v3_kwargs, ) session.commit() @@ -633,14 +647,22 @@ def test_task_states_for_dag_run(self): default_date2 = timezone.datetime(2016, 1, 9) dag2.clear() data_interval = dag2.timetable.infer_manual_data_interval(run_after=default_date2) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.CLI} if AIRFLOW_V_3_0_PLUS else {} + v3_kwargs = ( + { + "dag_version": None, + "triggered_by": DagRunTriggeredByType.CLI, + } + if AIRFLOW_V_3_0_PLUS + else {} + ) dagrun = dag2.create_dagrun( + run_id="test", state=State.RUNNING, logical_date=default_date2, data_interval=data_interval, run_type=DagRunType.MANUAL, external_trigger=True, - **triggered_by_kwargs, + **v3_kwargs, ) ti2 = TaskInstance(task2, run_id=dagrun.run_id) ti2.set_state(State.SUCCESS) @@ -714,7 +736,14 @@ def setup_method(self) -> None: dag = DagBag().get_dag(self.dag_id) data_interval = dag.timetable.infer_manual_data_interval(run_after=self.logical_date) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} + v3_kwargs = ( + { + "dag_version": None, + "triggered_by": DagRunTriggeredByType.TEST, + } + if AIRFLOW_V_3_0_PLUS + else {} + ) self.dr = dag.create_dagrun( run_id=self.run_id, logical_date=self.logical_date, @@ -722,7 +751,7 @@ def setup_method(self) -> None: start_date=timezone.utcnow(), state=State.RUNNING, run_type=DagRunType.MANUAL, - **triggered_by_kwargs, + **v3_kwargs, ) self.tis = self.dr.get_task_instances() assert len(self.tis) == 1 @@ -1019,7 +1048,15 @@ def test_context_with_run(): dag = DagBag().get_dag(dag_id) data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} + + v3_kwargs = ( + { + "dag_version": None, + "triggered_by": DagRunTriggeredByType.TEST, + } + if AIRFLOW_V_3_0_PLUS + else {} + ) dag.create_dagrun( run_id=run_id, logical_date=logical_date, @@ -1027,7 +1064,7 @@ def test_context_with_run(): start_date=timezone.utcnow(), state=State.RUNNING, run_type=DagRunType.MANUAL, - **triggered_by_kwargs, + **v3_kwargs, ) with conf_vars({("core", "dags_folder"): dag_path}): task_command.task_run(parser.parse_args(task_args)) diff --git a/tests/decorators/test_python.py b/tests/decorators/test_python.py index 377666427ab4e..6ffe935348e6d 100644 --- a/tests/decorators/test_python.py +++ b/tests/decorators/test_python.py @@ -444,7 +444,8 @@ def return_dict(number: int): triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dr = self.dag_non_serialized.create_dagrun( - run_id=DagRunType.MANUAL, + run_id="test", + run_type=DagRunType.MANUAL, start_date=timezone.utcnow(), logical_date=DEFAULT_DATE, state=State.RUNNING, @@ -508,7 +509,8 @@ def add_num(number: int, num2: int = 2): triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dr = self.dag_non_serialized.create_dagrun( - run_id=DagRunType.MANUAL, + run_id="test", + run_type=DagRunType.MANUAL, start_date=timezone.utcnow(), logical_date=DEFAULT_DATE, state=State.RUNNING, diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py index 3ca565cf5e2a5..d229abfe34f89 100644 --- a/tests/jobs/test_local_task_job.py +++ b/tests/jobs/test_local_task_job.py @@ -310,6 +310,7 @@ def test_heartbeat_failed_fast(self): triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dr = dag.create_dagrun( run_id="test_heartbeat_failed_fast_run", + run_type=DagRunType.MANUAL, state=State.RUNNING, logical_date=DEFAULT_DATE, start_date=DEFAULT_DATE, @@ -347,6 +348,7 @@ def test_mark_success_no_kill(self, caplog, get_test_dag, session): data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE) triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dr = dag.create_dagrun( + run_id="test", state=State.RUNNING, logical_date=DEFAULT_DATE, run_type=DagRunType.SCHEDULED, @@ -382,6 +384,7 @@ def test_localtaskjob_double_trigger(self): dag.clear() dr = dag.create_dagrun( run_id="test", + run_type=DagRunType.MANUAL, state=State.SUCCESS, logical_date=DEFAULT_DATE, start_date=DEFAULT_DATE, @@ -485,6 +488,7 @@ def test_mark_failure_on_failure_callback(self, caplog, get_test_dag): triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} with create_session() as session: dr = dag.create_dagrun( + run_id="test", state=State.RUNNING, logical_date=DEFAULT_DATE, run_type=DagRunType.SCHEDULED, @@ -520,6 +524,7 @@ def test_dagrun_timeout_logged_in_task_logs(self, caplog, get_test_dag): triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} with create_session() as session: dr = dag.create_dagrun( + run_id="test", state=State.RUNNING, start_date=DEFAULT_DATE, logical_date=DEFAULT_DATE, @@ -555,6 +560,7 @@ def test_failure_callback_called_by_airflow_run_raw_process(self, monkeypatch, t triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} with create_session() as session: dr = dag.create_dagrun( + run_id="test", state=State.RUNNING, logical_date=DEFAULT_DATE, run_type=DagRunType.SCHEDULED, @@ -591,6 +597,7 @@ def test_mark_success_on_success_callback(self, caplog, get_test_dag): triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} with create_session() as session: dr = dag.create_dagrun( + run_id="test", state=State.RUNNING, logical_date=DEFAULT_DATE, run_type=DagRunType.SCHEDULED, @@ -627,6 +634,7 @@ def test_success_listeners_executed(self, caplog, get_test_dag): triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} with create_session() as session: dr = dag.create_dagrun( + run_id="test", state=State.RUNNING, logical_date=DEFAULT_DATE, run_type=DagRunType.SCHEDULED, @@ -666,6 +674,7 @@ def test_success_slow_listeners_executed_kill(self, caplog, get_test_dag): triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} with create_session() as session: dr = dag.create_dagrun( + run_id="test", state=State.RUNNING, logical_date=DEFAULT_DATE, run_type=DagRunType.SCHEDULED, @@ -705,6 +714,7 @@ def test_success_slow_task_not_killed_by_overtime_but_regular_timeout(self, capl triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} with create_session() as session: dr = dag.create_dagrun( + run_id="test", state=State.RUNNING, logical_date=DEFAULT_DATE, run_type=DagRunType.SCHEDULED, @@ -748,6 +758,7 @@ def test_process_os_signal_calls_on_failure_callback( triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} with create_session() as session: dag.create_dagrun( + run_id="test", state=State.RUNNING, logical_date=DEFAULT_DATE, run_type=DagRunType.SCHEDULED, @@ -1007,6 +1018,7 @@ def task_function(): data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE) triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dag_run = dag.create_dagrun( + run_type=DagRunType.MANUAL, state=State.RUNNING, run_id=run_id, logical_date=logical_date, diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 328ae1742fb4a..0ea27f97bb81d 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -133,6 +133,37 @@ def _loader_mock(mock_executors): yield +@pytest.fixture +def create_dagrun(session): + triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} + + def _create_dagrun( + dag: DAG, + *, + logical_date: datetime, + data_interval: DataInterval, + run_type: DagRunType, + state: DagRunState = DagRunState.RUNNING, + start_date: datetime | None = None, + ) -> DagRun: + run_id = dag.timetable.generate_run_id( + run_type=run_type, + logical_date=logical_date, + data_interval=data_interval, + ) + return dag.create_dagrun( + run_id=run_id, + logical_date=logical_date, + data_interval=data_interval, + run_type=run_type, + state=state, + start_date=start_date, + **triggered_by_kwargs, + ) + + return _create_dagrun + + @patch.dict( ExecutorLoader.executors, {MOCK_EXECUTOR: f"{MockExecutor.__module__}.{MockExecutor.__qualname__}"} ) @@ -2071,33 +2102,25 @@ def _create_dagruns(): assert enqueued == 80 session.rollback() - def test_adopt_or_reset_orphaned_tasks(self, dag_maker): - session = settings.Session() - with dag_maker("test_execute_helper_reset_orphaned_tasks") as dag: + def test_adopt_or_reset_orphaned_tasks(self, dag_maker, session): + with dag_maker("test_execute_helper_reset_orphaned_tasks", session=session): op1 = EmptyOperator(task_id="op1") - data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE) - dr = dag_maker.create_dagrun() - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - dr2 = dag.create_dagrun( - run_type=DagRunType.BACKFILL_JOB, - state=State.RUNNING, - logical_date=DEFAULT_DATE + datetime.timedelta(1), - start_date=DEFAULT_DATE, - session=session, - data_interval=data_interval, - **triggered_by_kwargs, - ) scheduler_job = Job() session.add(scheduler_job) - session.commit() + session.flush() + + dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED) ti = dr.get_task_instance(task_id=op1.task_id, session=session) ti.state = State.QUEUED ti.queued_by_job_id = scheduler_job.id + session.flush() + + dr2 = dag_maker.create_dagrun_after(dr, run_type=DagRunType.SCHEDULED) ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session) ti2.state = State.QUEUED ti2.queued_by_job_id = scheduler_job.id - session.commit() + session.flush() processor = mock.MagicMock() @@ -3185,8 +3208,9 @@ def _create_dagruns(dag: DAG): next_info = dag.next_dagrun_info(None) triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} assert next_info is not None - for _ in range(30): + for i in range(30): yield dag.create_dagrun( + run_id=f"scheduled_{i}", run_type=DagRunType.SCHEDULED, logical_date=next_info.logical_date, data_interval=next_info.data_interval, @@ -4214,7 +4238,8 @@ def test_bulk_write_to_db_external_trigger_dont_skip_scheduled_run(self, dag_mak data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE) triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dr = dag.create_dagrun( - state=State.RUNNING, + run_id="test", + state=DagRunState.RUNNING, logical_date=timezone.utcnow(), run_type=DagRunType.MANUAL, session=session, @@ -4282,7 +4307,7 @@ def test_scheduler_create_dag_runs_check_existing_run(self, dag_maker): session.rollback() @conf_vars({("scheduler", "use_job_schedule"): "false"}) - def test_do_schedule_max_active_runs_dag_timed_out(self, dag_maker): + def test_do_schedule_max_active_runs_dag_timed_out(self, dag_maker, session): """Test that tasks are set to a finished state when their DAG times out""" with dag_maker( @@ -4297,11 +4322,11 @@ def test_do_schedule_max_active_runs_dag_timed_out(self, dag_maker): bash_command=' for((i=1;i<=600;i+=1)); do sleep "$i"; done', ) - session = settings.Session() data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE) triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dag_version = DagVersion.get_latest_version(dag.dag_id) run1 = dag.create_dagrun( + run_id="test1", run_type=DagRunType.SCHEDULED, logical_date=DEFAULT_DATE, state=State.RUNNING, @@ -4316,6 +4341,7 @@ def test_do_schedule_max_active_runs_dag_timed_out(self, dag_maker): run1_ti.state = State.RUNNING run2 = dag.create_dagrun( + run_id="test2", run_type=DagRunType.SCHEDULED, logical_date=DEFAULT_DATE + timedelta(seconds=10), state=State.QUEUED, @@ -5612,20 +5638,18 @@ def test_find_and_purge_zombies_nothing(self): self.job_runner._find_and_purge_zombies() executor.callback_sink.send.assert_not_called() - def test_find_and_purge_zombies(self, session, testing_dag_bundle): + @pytest.mark.usefixtures("testing_dag_bundle") + def test_find_and_purge_zombies(self, session, create_dagrun): dagfile = os.path.join(EXAMPLE_DAGS_FOLDER, "example_branch_operator.py") dagbag = DagBag(dagfile) dag = dagbag.get_dag("example_branch_operator") DAG.bulk_write_to_db("testing", None, [dag]) data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - dag_run = dag.create_dagrun( - state=DagRunState.RUNNING, + dag_run = create_dagrun( + dag, logical_date=DEFAULT_DATE, run_type=DagRunType.SCHEDULED, - session=session, data_interval=data_interval, - **triggered_by_kwargs, ) executor = MockExecutor() @@ -5668,12 +5692,11 @@ def test_find_and_purge_zombies(self, session, testing_dag_bundle): assert callback_request.ti.run_id == ti.run_id assert callback_request.ti.map_index == ti.map_index - def test_zombie_message(self, testing_dag_bundle, session): + @pytest.mark.usefixtures("testing_dag_bundle") + def test_zombie_message(self, session, create_dagrun): """ Check that the zombie message comes out as expected """ - - dagbag = DagBag(TEST_DAG_FOLDER, read_dags_from_db=False) dagfile = os.path.join(EXAMPLE_DAGS_FOLDER, "example_branch_operator.py") dagbag = DagBag(dagfile) dag = dagbag.get_dag("example_branch_operator") @@ -5682,14 +5705,11 @@ def test_zombie_message(self, testing_dag_bundle, session): session.query(Job).delete() data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - dag_run = dag.create_dagrun( - state=DagRunState.RUNNING, + dag_run = create_dagrun( + dag, logical_date=DEFAULT_DATE, run_type=DagRunType.SCHEDULED, - session=session, data_interval=data_interval, - **triggered_by_kwargs, ) scheduler_job = Job(executor=MockExecutor()) @@ -5733,8 +5753,10 @@ def test_zombie_message(self, testing_dag_bundle, session): "External Executor Id": "abcdefg", } + @pytest.mark.usefixtures("testing_dag_bundle") def test_find_zombies_handle_failure_callbacks_are_correctly_passed_to_dag_processor( - self, testing_dag_bundle + self, + create_dagrun, ): """ Check that the same set of failure callback with zombies are passed to the dag @@ -5749,14 +5771,12 @@ def test_find_zombies_handle_failure_callbacks_are_correctly_passed_to_dag_proce dag = dagbag.get_dag("test_example_bash_operator") DAG.bulk_write_to_db("testing", None, [dag]) data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - dag_run = dag.create_dagrun( + dag_run = create_dagrun( + dag, state=DagRunState.RUNNING, logical_date=DEFAULT_DATE, run_type=DagRunType.SCHEDULED, - session=session, data_interval=data_interval, - **triggered_by_kwargs, ) task = dag.get_task(task_id="run_this_last") diff --git a/tests/models/test_cleartasks.py b/tests/models/test_cleartasks.py index 9b5af0b62c5f0..1543fa347ecfc 100644 --- a/tests/models/test_cleartasks.py +++ b/tests/models/test_cleartasks.py @@ -644,6 +644,7 @@ def test_dags_clear(self): triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dr = dag.create_dagrun( + run_id=f"scheduled_{i}", logical_date=DEFAULT_DATE, state=State.RUNNING, run_type=DagRunType.SCHEDULED, diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 9f599e7acd47b..30495a4ccb6db 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -112,6 +112,7 @@ from airflow.utils.types import DagRunTriggeredByType if TYPE_CHECKING: + from pendulum import DateTime from sqlalchemy.orm import Session pytestmark = pytest.mark.db_test @@ -137,6 +138,32 @@ def clear_assets(): clear_db_assets() +def _create_dagrun( + dag: DAG, + *, + logical_date: DateTime, + data_interval: DataInterval, + run_type: DagRunType, + state: DagRunState = DagRunState.RUNNING, + start_date: datetime.datetime | None = None, +) -> DagRun: + triggered_by_kwargs: dict = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} + run_id = dag.timetable.generate_run_id( + run_type=run_type, + logical_date=logical_date, + data_interval=data_interval, + ) + return dag.create_dagrun( + run_id=run_id, + logical_date=logical_date, + data_interval=data_interval, + run_type=run_type, + state=state, + start_date=start_date, + **triggered_by_kwargs, + ) + + class TestDag: def setup_method(self) -> None: clear_db_runs() @@ -282,13 +309,11 @@ def test_dag_task_custom_weight_strategy(self, cls, expected): task_id="empty_task", weight_rule=cls(), ) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - dr = dag.create_dagrun( - state=None, - run_id="test", + dr = _create_dagrun( + dag, logical_date=DEFAULT_DATE, data_interval=(DEFAULT_DATE, DEFAULT_DATE), - **triggered_by_kwargs, + run_type=DagRunType.MANUAL, ) ti = dr.get_task_instance(task.task_id) assert ti.priority_weight == expected @@ -299,44 +324,39 @@ def test_get_num_task_instances(self): test_dag = DAG(dag_id=test_dag_id, schedule=None, start_date=DEFAULT_DATE) test_task = EmptyOperator(task_id=test_task_id, dag=test_dag) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - dr1 = test_dag.create_dagrun( - state=None, - run_id="test1", + dr1 = _create_dagrun( + test_dag, + run_type=DagRunType.MANUAL, logical_date=DEFAULT_DATE, data_interval=(DEFAULT_DATE, DEFAULT_DATE), - **triggered_by_kwargs, ) - dr2 = test_dag.create_dagrun( - state=None, - run_id="test2", + dr2 = _create_dagrun( + test_dag, + run_type=DagRunType.MANUAL, logical_date=DEFAULT_DATE + datetime.timedelta(days=1), data_interval=( DEFAULT_DATE + datetime.timedelta(days=1), DEFAULT_DATE + datetime.timedelta(days=1), ), - **triggered_by_kwargs, ) - dr3 = test_dag.create_dagrun( - state=None, - run_id="test3", + dr3 = _create_dagrun( + test_dag, + run_type=DagRunType.MANUAL, logical_date=DEFAULT_DATE + datetime.timedelta(days=2), data_interval=( DEFAULT_DATE + datetime.timedelta(days=2), DEFAULT_DATE + datetime.timedelta(days=2), ), - **triggered_by_kwargs, ) - dr4 = test_dag.create_dagrun( - state=None, - run_id="test4", + dr4 = _create_dagrun( + test_dag, + run_type=DagRunType.MANUAL, logical_date=DEFAULT_DATE + datetime.timedelta(days=3), data_interval=( DEFAULT_DATE + datetime.timedelta(days=2), DEFAULT_DATE + datetime.timedelta(days=2), ), - **triggered_by_kwargs, ) ti1 = TI(task=test_task, run_id=dr1.run_id) @@ -407,6 +427,8 @@ def dag_run_before(delta_h=0, type=DagRunType.SCHEDULED): state=State.SUCCESS, run_type=type, run_id=f"test_{delta_h}", + logical_date=None, + data_interval=None, session=session, **triggered_by_kwargs, ) @@ -606,6 +628,7 @@ def test_create_dagrun_when_schedule_is_none_and_empty_start_date(self): dag.add_task(BaseOperator(task_id="task_without_start_date")) triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dagrun = dag.create_dagrun( + run_id="test", state=State.RUNNING, run_type=DagRunType.MANUAL, logical_date=DEFAULT_DATE, @@ -802,6 +825,7 @@ def test_bulk_write_to_db_max_active_runs(self, testing_dag_bundle, state): triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dr = dag.create_dagrun( + run_id="test", state=state, logical_date=model.next_dagrun, run_type=DagRunType.SCHEDULED, @@ -1089,6 +1113,7 @@ def test_schedule_dag_no_previous_runs(self): triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dag_run = dag.create_dagrun( + run_id="test", run_type=DagRunType.SCHEDULED, logical_date=TEST_DATE, state=State.RUNNING, @@ -1128,6 +1153,7 @@ def test_dag_handle_callback_crash(self, mock_stats): with create_session() as session: dag_run = dag.create_dagrun( + run_id="test", state=State.RUNNING, logical_date=when, run_type=DagRunType.MANUAL, @@ -1166,6 +1192,7 @@ def test_dag_handle_callback_with_removed_task(self, dag_maker, session): with create_session() as session: triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dag_run = dag.create_dagrun( + run_id="test", state=State.RUNNING, logical_date=TEST_DATE, run_type=DagRunType.MANUAL, @@ -1194,15 +1221,13 @@ def test_next_dagrun_after_fake_scheduled_previous(self): dag_id = "test_schedule_dag_fake_scheduled_previous" dag = DAG(dag_id=dag_id, schedule=delta, start_date=DEFAULT_DATE) dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake", start_date=DEFAULT_DATE)) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - dag.create_dagrun( + _create_dagrun( + dag, run_type=DagRunType.SCHEDULED, logical_date=DEFAULT_DATE, state=State.SUCCESS, - external_trigger=True, data_interval=(DEFAULT_DATE, DEFAULT_DATE), - **triggered_by_kwargs, ) dag.sync_to_db() with create_session() as session: @@ -1228,13 +1253,12 @@ def test_schedule_dag_once(self): # Sync once to create the DagModel dag.sync_to_db() - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - dag.create_dagrun( + _create_dagrun( + dag, run_type=DagRunType.SCHEDULED, logical_date=TEST_DATE, state=State.SUCCESS, data_interval=(TEST_DATE, TEST_DATE), - **triggered_by_kwargs, ) # Then sync again after creating the dag run -- this should update next_dagrun @@ -1256,15 +1280,13 @@ def test_fractional_seconds(self): start_date = timezone.utcnow() - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - run = dag.create_dagrun( - run_id="test_" + start_date.isoformat(), + run = _create_dagrun( + dag, + run_type=DagRunType.MANUAL, logical_date=start_date, start_date=start_date, state=State.RUNNING, - external_trigger=False, data_interval=(start_date, start_date), - **triggered_by_kwargs, ) run.refresh_from_db() @@ -1389,37 +1411,15 @@ def test_description_from_timetable(self, timetable, expected_description): assert dag.timetable == timetable assert dag.timetable.description == expected_description - def test_create_dagrun_run_id_is_generated(self): - dag = DAG(dag_id="run_id_is_generated", schedule=None) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - dr = dag.create_dagrun( - run_type=DagRunType.MANUAL, - logical_date=DEFAULT_DATE, - state=State.NONE, - data_interval=(DEFAULT_DATE, DEFAULT_DATE), - **triggered_by_kwargs, - ) - assert dr.run_id == f"manual__{DEFAULT_DATE.isoformat()}" - - def test_create_dagrun_run_type_is_obtained_from_run_id(self): - dag = DAG(dag_id="run_type_is_obtained_from_run_id", schedule=None) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - dr = dag.create_dagrun(run_id="scheduled__", state=State.NONE, **triggered_by_kwargs) - assert dr.run_type == DagRunType.SCHEDULED - - dr = dag.create_dagrun( - run_id="custom_is_set_to_manual", - state=State.NONE, - **triggered_by_kwargs, - ) - assert dr.run_type == DagRunType.MANUAL - def test_create_dagrun_job_id_is_set(self): job_id = 42 dag = DAG(dag_id="test_create_dagrun_job_id_is_set", schedule=None) triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dr = dag.create_dagrun( run_id="test_create_dagrun_job_id_is_set", + logical_date=DEFAULT_DATE, + data_interval=(DEFAULT_DATE, DEFAULT_DATE), + run_type=DagRunType.MANUAL, state=State.NONE, creating_job_id=job_id, **triggered_by_kwargs, @@ -1485,14 +1485,13 @@ def test_clear_set_dagrun_state(self, dag_run_state): t_1 = EmptyOperator(task_id=task_id, dag=dag) session = settings.Session() - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - dagrun_1 = dag.create_dagrun( + dagrun_1 = _create_dagrun( + dag, run_type=DagRunType.BACKFILL_JOB, state=State.FAILED, start_date=DEFAULT_DATE, logical_date=DEFAULT_DATE, data_interval=(DEFAULT_DATE, DEFAULT_DATE), - **triggered_by_kwargs, ) session.merge(dagrun_1) @@ -1536,6 +1535,7 @@ def consumer(value): session = settings.Session() triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dagrun_1 = dag.create_dagrun( + run_id="backfill", run_type=DagRunType.BACKFILL_JOB, state=State.FAILED, start_date=DEFAULT_DATE, @@ -1723,6 +1723,7 @@ def test_clear_dag( session = settings.Session() # type: ignore triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dagrun_1 = dag.create_dagrun( + run_id="backfill", run_type=DagRunType.BACKFILL_JOB, state=DagRunState.RUNNING, start_date=DEFAULT_DATE, @@ -1733,6 +1734,8 @@ def test_clear_dag( session.merge(dagrun_1) task_instance_1 = dagrun_1.get_task_instance(task_id) + if TYPE_CHECKING: + assert task_instance_1 task_instance_1.state = ti_state_begin task_instance_1.job_id = 123 session.merge(task_instance_1) @@ -2037,6 +2040,7 @@ def test_validate_params_on_trigger_dag(self): with pytest.raises(ParamValidationError, match="No value passed and Param has no default value"): dag.create_dagrun( run_id="test_dagrun_missing_param", + run_type=DagRunType.MANUAL, state=State.RUNNING, logical_date=TEST_DATE, data_interval=(TEST_DATE, TEST_DATE), @@ -2049,6 +2053,7 @@ def test_validate_params_on_trigger_dag(self): ): dag.create_dagrun( run_id="test_dagrun_missing_param", + run_type=DagRunType.MANUAL, state=State.RUNNING, logical_date=TEST_DATE, conf={"param1": None}, @@ -2059,6 +2064,7 @@ def test_validate_params_on_trigger_dag(self): dag = DAG("dummy-dag", schedule=None, params={"param1": Param(type="string")}) dag.create_dagrun( run_id="test_dagrun_missing_param", + run_type=DagRunType.MANUAL, state=State.RUNNING, logical_date=TEST_DATE, conf={"param1": "hello"}, @@ -2521,6 +2527,7 @@ def test_count_number_queries(self, tasks_count): with assert_queries_count(4): dag.create_dagrun( run_id="test_dagrun_query_count", + run_type=DagRunType.MANUAL, state=State.RUNNING, logical_date=TEST_DATE, data_interval=(TEST_DATE, TEST_DATE), @@ -2595,7 +2602,8 @@ def return_num(num): triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dr = dag.create_dagrun( - run_id=DagRunType.MANUAL.value, + run_id="test", + run_type=DagRunType.MANUAL, start_date=timezone.utcnow(), logical_date=self.DEFAULT_DATE, data_interval=(self.DEFAULT_DATE, self.DEFAULT_DATE), @@ -2625,7 +2633,8 @@ def return_num(num): new_value = 52 triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dr = dag.create_dagrun( - run_id=DagRunType.MANUAL.value, + run_id="test", + run_type=DagRunType.MANUAL, start_date=timezone.utcnow(), logical_date=self.DEFAULT_DATE, data_interval=(self.DEFAULT_DATE, self.DEFAULT_DATE), @@ -3119,25 +3128,6 @@ def test_get_asset_triggered_next_run_info_with_unresolved_asset_alias(dag_maker assert dag1_model.get_asset_triggered_next_run_info(session=session) is None -def test_dag_uses_timetable_for_run_id(session): - class CustomRunIdTimetable(Timetable): - def generate_run_id(self, *, run_type, logical_date, data_interval, **extra) -> str: - return "abc" - - dag = DAG(dag_id="test", start_date=DEFAULT_DATE, schedule=CustomRunIdTimetable()) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - - dag_run = dag.create_dagrun( - run_type=DagRunType.MANUAL, - state=DagRunState.QUEUED, - logical_date=DEFAULT_DATE, - data_interval=(DEFAULT_DATE, DEFAULT_DATE), - **triggered_by_kwargs, - ) - - assert dag_run.run_id == "abc" - - @pytest.mark.parametrize( "run_id_type", [DagRunType.BACKFILL_JOB, DagRunType.SCHEDULED, DagRunType.ASSET_TRIGGERED], diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py index f398be8ad88c4..96c06d279bc3b 100644 --- a/tests/models/test_dagrun.py +++ b/tests/models/test_dagrun.py @@ -30,7 +30,6 @@ from airflow import settings from airflow.callbacks.callback_requests import DagCallbackRequest from airflow.decorators import setup, task, task_group, teardown -from airflow.exceptions import AirflowException from airflow.models.baseoperator import BaseOperator from airflow.models.dag import DAG, DagModel from airflow.models.dagrun import DagRun, DagRunNote @@ -111,6 +110,11 @@ def create_dag_run( run_type = DagRunType.MANUAL data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date) dag_run = dag.create_dagrun( + run_id=dag.timetable.generate_run_id( + run_type=run_type, + logical_date=logical_date, + data_interval=data_interval, + ), run_type=run_type, logical_date=logical_date, data_interval=data_interval, @@ -123,6 +127,8 @@ def create_dag_run( if task_states is not None: for task_id, task_state in task_states.items(): ti = dag_run.get_task_instance(task_id) + if TYPE_CHECKING: + assert ti ti.set_state(task_state, session) session.flush() @@ -279,18 +285,11 @@ def test_dagrun_not_stuck_in_running_when_all_tasks_instances_are_removed(self, dag_run.update_state() assert dag_run.state == DagRunState.SUCCESS - def test_dagrun_success_conditions(self, session): - dag = DAG( - "test_dagrun_success_conditions", - schedule=datetime.timedelta(days=1), - start_date=DEFAULT_DATE, - default_args={"owner": "owner1"}, - ) - + def test_dagrun_success_conditions(self, dag_maker, session): # A -> B # A -> C -> D # ordered: B, D, C, A or D, B, C, A or D, C, B, A - with dag: + with dag_maker(schedule=datetime.timedelta(days=1), session=session): op1 = EmptyOperator(task_id="A") op2 = EmptyOperator(task_id="B") op3 = EmptyOperator(task_id="C") @@ -298,18 +297,7 @@ def test_dagrun_success_conditions(self, session): op1.set_upstream([op2, op3]) op3.set_upstream(op4) - dag.clear() - - now = pendulum.now("UTC") - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - dr = dag.create_dagrun( - run_id="test_dagrun_success_conditions", - state=DagRunState.RUNNING, - logical_date=now, - data_interval=dag.timetable.infer_manual_data_interval(run_after=now), - start_date=now, - **triggered_by_kwargs, - ) + dr = dag_maker.create_dagrun() # op1 = root ti_op1 = dr.get_task_instance(task_id=op1.task_id) @@ -330,32 +318,14 @@ def test_dagrun_success_conditions(self, session): dr.update_state() assert dr.state == DagRunState.SUCCESS - def test_dagrun_deadlock(self, session): - dag = DAG( - "text_dagrun_deadlock", - schedule=datetime.timedelta(days=1), - start_date=DEFAULT_DATE, - default_args={"owner": "owner1"}, - ) - - with dag: + def test_dagrun_deadlock(self, dag_maker, session): + with dag_maker(schedule=datetime.timedelta(days=1), session=session): op1 = EmptyOperator(task_id="A") op2 = EmptyOperator(task_id="B") op2.trigger_rule = TriggerRule.ONE_FAILED op2.set_upstream(op1) - dag.clear() - now = pendulum.now("UTC") - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - dr = dag.create_dagrun( - run_id="test_dagrun_deadlock", - state=DagRunState.RUNNING, - logical_date=now, - data_interval=dag.timetable.infer_manual_data_interval(run_after=now), - start_date=now, - session=session, - **triggered_by_kwargs, - ) + dr = dag_maker.create_dagrun() ti_op1: TI = dr.get_task_instance(task_id=op1.task_id, session=session) ti_op2: TI = dr.get_task_instance(task_id=op2.task_id, session=session) @@ -370,56 +340,32 @@ def test_dagrun_deadlock(self, session): dr.update_state(session=session) assert dr.state == DagRunState.FAILED - def test_dagrun_no_deadlock_with_restarting(self, session): - dag = DAG( - "test_dagrun_no_deadlock_with_restarting", - schedule=datetime.timedelta(days=1), - start_date=DEFAULT_DATE, - ) - with dag: + def test_dagrun_no_deadlock_with_restarting(self, dag_maker, session): + with dag_maker(schedule=datetime.timedelta(days=1)): op1 = EmptyOperator(task_id="upstream_task") op2 = EmptyOperator(task_id="downstream_task") op2.set_upstream(op1) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - dr = dag.create_dagrun( - run_id="test_dagrun_no_deadlock_with_shutdown", - state=DagRunState.RUNNING, - logical_date=DEFAULT_DATE, - data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE), - start_date=DEFAULT_DATE, - **triggered_by_kwargs, - ) + dr = dag_maker.create_dagrun() upstream_ti = dr.get_task_instance(task_id="upstream_task") upstream_ti.set_state(TaskInstanceState.RESTARTING, session=session) dr.update_state() assert dr.state == DagRunState.RUNNING - def test_dagrun_no_deadlock_with_depends_on_past(self, session): - dag = DAG("test_dagrun_no_deadlock", schedule=datetime.timedelta(days=1), start_date=DEFAULT_DATE) - with dag: + def test_dagrun_no_deadlock_with_depends_on_past(self, dag_maker, session): + with dag_maker(schedule=datetime.timedelta(days=1)): EmptyOperator(task_id="dop", depends_on_past=True) EmptyOperator(task_id="tc", max_active_tis_per_dag=1) - dag.clear() - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - dr = dag.create_dagrun( + dr = dag_maker.create_dagrun( run_id="test_dagrun_no_deadlock_1", - state=DagRunState.RUNNING, - logical_date=DEFAULT_DATE, - data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE), start_date=DEFAULT_DATE, - **triggered_by_kwargs, ) - next_date = DEFAULT_DATE + datetime.timedelta(days=1) - dr2 = dag.create_dagrun( + dr2 = dag_maker.create_dagrun_after( + dr, run_id="test_dagrun_no_deadlock_2", - state=DagRunState.RUNNING, - logical_date=next_date, - data_interval=dag.timetable.infer_manual_data_interval(run_after=next_date), - start_date=next_date, - **triggered_by_kwargs, + start_date=DEFAULT_DATE + datetime.timedelta(days=1), ) ti1_op1 = dr.get_task_instance(task_id="dop") dr2.get_task_instance(task_id="dop") @@ -594,26 +540,11 @@ def on_failure_callable(context): msg="task_failure", ) - def test_dagrun_set_state_end_date(self, session): - dag = DAG( - "test_dagrun_set_state_end_date", - schedule=datetime.timedelta(days=1), - start_date=DEFAULT_DATE, - default_args={"owner": "owner1"}, - ) - - dag.clear() + def test_dagrun_set_state_end_date(self, dag_maker, session): + with dag_maker(schedule=datetime.timedelta(days=1), start_date=DEFAULT_DATE): + pass - now = pendulum.now("UTC") - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - dr = dag.create_dagrun( - run_id="test_dagrun_set_state_end_date", - state=DagRunState.RUNNING, - logical_date=now, - data_interval=dag.timetable.infer_manual_data_interval(now), - start_date=now, - **triggered_by_kwargs, - ) + dr = dag_maker.create_dagrun() # Initial end_date should be NULL # DagRunState.SUCCESS and DagRunState.FAILED are all ending state and should set end_date @@ -626,7 +557,7 @@ def test_dagrun_set_state_end_date(self, session): session.merge(dr) session.commit() - dr_database = session.query(DagRun).filter(DagRun.run_id == "test_dagrun_set_state_end_date").one() + dr_database = session.query(DagRun).filter(DagRun.run_id == dr.run_id).one() assert dr_database.end_date is not None assert dr.end_date == dr_database.end_date @@ -634,44 +565,26 @@ def test_dagrun_set_state_end_date(self, session): session.merge(dr) session.commit() - dr_database = session.query(DagRun).filter(DagRun.run_id == "test_dagrun_set_state_end_date").one() + dr_database = session.query(DagRun).filter(DagRun.run_id == dr.run_id).one() assert dr_database.end_date is None dr.set_state(DagRunState.FAILED) session.merge(dr) session.commit() - dr_database = session.query(DagRun).filter(DagRun.run_id == "test_dagrun_set_state_end_date").one() + dr_database = session.query(DagRun).filter(DagRun.run_id == dr.run_id).one() assert dr_database.end_date is not None assert dr.end_date == dr_database.end_date - def test_dagrun_update_state_end_date(self, session): - dag = DAG( - "test_dagrun_update_state_end_date", - schedule=datetime.timedelta(days=1), - start_date=DEFAULT_DATE, - default_args={"owner": "owner1"}, - ) - + def test_dagrun_update_state_end_date(self, dag_maker, session): # A -> B - with dag: + with dag_maker(schedule=datetime.timedelta(days=1)): op1 = EmptyOperator(task_id="A") op2 = EmptyOperator(task_id="B") op1.set_upstream(op2) - dag.clear() - - now = pendulum.now("UTC") - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - dr = dag.create_dagrun( - run_id="test_dagrun_update_state_end_date", - state=DagRunState.RUNNING, - logical_date=now, - data_interval=dag.timetable.infer_manual_data_interval(now), - start_date=now, - **triggered_by_kwargs, - ) + dr = dag_maker.create_dagrun() # Initial end_date should be NULL # DagRunState.SUCCESS and DagRunState.FAILED are all ending state and should set end_date @@ -687,7 +600,7 @@ def test_dagrun_update_state_end_date(self, session): dr.update_state() - dr_database = session.query(DagRun).filter(DagRun.run_id == "test_dagrun_update_state_end_date").one() + dr_database = session.query(DagRun).filter(DagRun.run_id == dr.run_id).one() assert dr_database.end_date is not None assert dr.end_date == dr_database.end_date @@ -695,7 +608,7 @@ def test_dagrun_update_state_end_date(self, session): ti_op2.set_state(state=TaskInstanceState.RUNNING, session=session) dr.update_state() - dr_database = session.query(DagRun).filter(DagRun.run_id == "test_dagrun_update_state_end_date").one() + dr_database = session.query(DagRun).filter(DagRun.run_id == dr.run_id).one() assert dr._state == DagRunState.RUNNING assert dr.end_date is None @@ -705,7 +618,7 @@ def test_dagrun_update_state_end_date(self, session): ti_op2.set_state(state=TaskInstanceState.FAILED, session=session) dr.update_state() - dr_database = session.query(DagRun).filter(DagRun.run_id == "test_dagrun_update_state_end_date").one() + dr_database = session.query(DagRun).filter(DagRun.run_id == dr.run_id).one() assert dr_database.end_date is not None assert dr.end_date == dr_database.end_date @@ -925,6 +838,11 @@ def test_next_dagruns_to_examine_only_unpaused(self, session, state): session.flush() triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dr = dag.create_dagrun( + run_id=dag.timetable.generate_run_id( + run_type=DagRunType.SCHEDULED, + logical_date=DEFAULT_DATE, + data_interval=dag.infer_automated_data_interval(DEFAULT_DATE), + ), run_type=DagRunType.SCHEDULED, state=state, logical_date=DEFAULT_DATE, @@ -998,6 +916,11 @@ def test_emit_scheduling_delay(self, session, schedule, expected): session.flush() triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dag_run = dag.create_dagrun( + run_id=dag.timetable.generate_run_id( + run_type=DagRunType.SCHEDULED, + logical_date=dag.start_date, + data_interval=dag.infer_automated_data_interval(dag.start_date), + ), run_type=DagRunType.SCHEDULED, state=DagRunState.SUCCESS, logical_date=dag.start_date, @@ -1065,12 +988,7 @@ def test_update_state_one_unfinished(self, dag_maker, session): with dag_maker(session=session) as dag: PythonOperator(task_id="t1", python_callable=lambda: print) PythonOperator(task_id="t2", python_callable=lambda: print) - dr = dag.create_dagrun( - state=DagRunState.FAILED, - triggered_by=DagRunTriggeredByType.TEST, - run_id="abc123", - session=session, - ) + dr = dag_maker.create_dagrun(state=DagRunState.FAILED) for ti in dr.get_task_instances(session=session): ti.state = TaskInstanceState.FAILED session.commit() @@ -2821,9 +2739,10 @@ def make_task(task_id, dag): def test_dag_run_id_config(session, dag_maker, pattern, run_id, result): with conf_vars({("scheduler", "allowed_run_id_pattern"): pattern}): with dag_maker(): - ... + pass + run_type = DagRunType.from_run_id(run_id) if result: - dag_maker.create_dagrun(run_id=run_id) + dag_maker.create_dagrun(run_id=run_id, run_type=run_type) else: - with pytest.raises(AirflowException): - dag_maker.create_dagrun(run_id=run_id) + with pytest.raises(ValueError): + dag_maker.create_dagrun(run_id=run_id, run_type=run_type) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 0422ceb39949d..29506c6676118 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -1748,6 +1748,8 @@ def test_xcom_pull_different_logical_date(self, create_task_instance): triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dr = ti.task.dag.create_dagrun( run_id="test2", + run_type=DagRunType.MANUAL, + logical_date=exec_date, data_interval=(exec_date, exec_date), state=None, **triggered_by_kwargs, @@ -2022,6 +2024,7 @@ def test_get_num_running_task_instances(self, create_task_instance): triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dr = ti1.task.dag.create_dagrun( logical_date=logical_date, + run_type=DagRunType.MANUAL, state=None, run_id="2", session=session, diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py index c73980ca24bd5..d70c63263b91a 100644 --- a/tests/sensors/test_external_task_sensor.py +++ b/tests/sensors/test_external_task_sensor.py @@ -45,11 +45,12 @@ from airflow.providers.standard.sensors.time import TimeSensor from airflow.providers.standard.triggers.external_task import WorkflowTrigger from airflow.serialization.serialized_objects import SerializedBaseOperator +from airflow.timetables.base import DataInterval from airflow.utils.hashlib_wrapper import md5 from airflow.utils.session import NEW_SESSION, create_session, provide_session from airflow.utils.state import DagRunState, State, TaskInstanceState from airflow.utils.task_group import TaskGroup -from airflow.utils.timezone import datetime +from airflow.utils.timezone import coerce_datetime, datetime from airflow.utils.types import DagRunType from tests.models import TEST_DAGS_FOLDER @@ -438,17 +439,11 @@ def test_external_task_sensor_failed_states_as_success_mulitple_task_ids(self, c f"in dag unit_test_dag on {DEFAULT_DATE.isoformat()} ... " ) in caplog.messages - def test_external_dag_sensor(self): - other_dag = DAG("other_dag", default_args=self.args, end_date=DEFAULT_DATE, schedule="@once") - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - other_dag.create_dagrun( - run_id="test", - start_date=DEFAULT_DATE, - logical_date=DEFAULT_DATE, - state=State.SUCCESS, - data_interval=(DEFAULT_DATE, DEFAULT_DATE), - **triggered_by_kwargs, - ) + def test_external_dag_sensor(self, dag_maker): + with dag_maker("other_dag", default_args=self.args, end_date=DEFAULT_DATE, schedule="@once"): + pass + dag_maker.create_dagrun(state=DagRunState.SUCCESS) + op = ExternalTaskSensor( task_id="test_external_dag_sensor_check", external_dag_id="other_dag", @@ -457,17 +452,10 @@ def test_external_dag_sensor(self): ) op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - def test_external_dag_sensor_log(self, caplog): - other_dag = DAG("other_dag", default_args=self.args, end_date=DEFAULT_DATE, schedule="@once") - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - other_dag.create_dagrun( - run_id="test", - start_date=DEFAULT_DATE, - logical_date=DEFAULT_DATE, - state=State.SUCCESS, - data_interval=(DEFAULT_DATE, DEFAULT_DATE), - **triggered_by_kwargs, - ) + def test_external_dag_sensor_log(self, caplog, dag_maker): + with dag_maker("other_dag", default_args=self.args, end_date=DEFAULT_DATE, schedule="@once"): + pass + dag_maker.create_dagrun(state=DagRunState.SUCCESS) op = ExternalTaskSensor( task_id="test_external_dag_sensor_check", external_dag_id="other_dag", @@ -476,17 +464,10 @@ def test_external_dag_sensor_log(self, caplog): op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) assert (f"Poking for DAG 'other_dag' on {DEFAULT_DATE.isoformat()} ... ") in caplog.messages - def test_external_dag_sensor_soft_fail_as_skipped(self): - other_dag = DAG("other_dag", default_args=self.args, end_date=DEFAULT_DATE, schedule="@once") - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - other_dag.create_dagrun( - run_id="test", - start_date=DEFAULT_DATE, - logical_date=DEFAULT_DATE, - state=State.SUCCESS, - data_interval=(DEFAULT_DATE, DEFAULT_DATE), - **triggered_by_kwargs, - ) + def test_external_dag_sensor_soft_fail_as_skipped(self, dag_maker, session): + with dag_maker("other_dag", default_args=self.args, end_date=DEFAULT_DATE, schedule="@once"): + pass + dag_maker.create_dagrun(state=DagRunState.SUCCESS) op = ExternalTaskSensor( task_id="test_external_dag_sensor_check", external_dag_id="other_dag", @@ -501,7 +482,6 @@ def test_external_dag_sensor_soft_fail_as_skipped(self): op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) # then - session = settings.Session() TI = TaskInstance task_instances: list[TI] = session.query(TI).filter(TI.task_id == op.task_id).all() assert len(task_instances) == 1, "Unexpected number of task instances" @@ -1252,19 +1232,24 @@ def run_tasks( """ runs: dict[str, DagRun] = {} tis: dict[str, TaskInstance] = {} - triggered_by = DagRunTriggeredByType.TEST if AIRFLOW_V_3_0_PLUS else None for dag in dag_bag.dags.values(): - dagrun = dag.create_dagrun( - state=DagRunState.RUNNING, + data_interval = DataInterval(coerce_datetime(logical_date), coerce_datetime(logical_date)) + runs[dag.dag_id] = dagrun = dag.create_dagrun( + run_id=dag.timetable.generate_run_id( + run_type=DagRunType.MANUAL, + logical_date=logical_date, + data_interval=data_interval, + ), logical_date=logical_date, - start_date=logical_date, + data_interval=data_interval, run_type=DagRunType.MANUAL, + triggered_by=DagRunTriggeredByType.TEST, + dag_version=None, + state=DagRunState.RUNNING, + start_date=logical_date, session=session, - data_interval=(logical_date, logical_date), - triggered_by=triggered_by, ) - runs[dag.dag_id] = dagrun # we use sorting by task_id here because for the test DAG structure of ours # this is equivalent to topological sort. It would not work in general case # but it works for our case because we specifically constructed test DAGS diff --git a/tests/task/test_standard_task_runner.py b/tests/task/test_standard_task_runner.py index 5e9eee15b07dd..51c5f9f8fa3e8 100644 --- a/tests/task/test_standard_task_runner.py +++ b/tests/task/test_standard_task_runner.py @@ -40,6 +40,7 @@ from airflow.utils.platform import getuser from airflow.utils.state import State from airflow.utils.timeout import timeout +from airflow.utils.types import DagRunType from tests.listeners import xcom_listener from tests.listeners.file_write_listener import FileWriteListener @@ -173,7 +174,7 @@ def test_start_and_terminate(self, mock_init, mock_read_task_utilization): mock_read_task_utilization.assert_called() @pytest.mark.db_test - def test_notifies_about_start_and_stop(self, tmp_path): + def test_notifies_about_start_and_stop(self, tmp_path, session): path_listener_writer = tmp_path / "test_notifies_about_start_and_stop" lm = get_listener_manager() @@ -185,14 +186,19 @@ def test_notifies_about_start_and_stop(self, tmp_path): ) dag = dagbag.dags.get("test_example_bash_operator") task = dag.get_task("runme_1") + current_time = timezone.utcnow() triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dag.create_dagrun( run_id="test", - data_interval=(DEFAULT_DATE, DEFAULT_DATE), + logical_date=current_time, + data_interval=(current_time, current_time), + run_type=DagRunType.MANUAL, state=State.RUNNING, - start_date=DEFAULT_DATE, + start_date=current_time, + session=session, **triggered_by_kwargs, ) + session.commit() ti = TaskInstance(task=task, run_id="test") job = Job(dag_id=ti.dag_id) job_runner = LocalTaskJobRunner(job=job, task_instance=ti, ignore_ti_state=True) @@ -228,12 +234,15 @@ def test_notifies_about_fail(self, tmp_path): ) dag = dagbag.dags.get("test_failing_bash_operator") task = dag.get_task("failing_task") + current_time = timezone.utcnow() triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dag.create_dagrun( run_id="test", - data_interval=(DEFAULT_DATE, DEFAULT_DATE), + logical_date=current_time, + data_interval=(current_time, current_time), + run_type=DagRunType.MANUAL, state=State.RUNNING, - start_date=DEFAULT_DATE, + start_date=current_time, **triggered_by_kwargs, ) ti = TaskInstance(task=task, run_id="test") @@ -275,12 +284,15 @@ def test_ol_does_not_block_xcoms(self, tmp_path): ) dag = dagbag.dags.get("test_dag_xcom_openlineage") task = dag.get_task("push_and_pull") + current_time = timezone.utcnow() triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dag.create_dagrun( run_id="test", - data_interval=(DEFAULT_DATE, DEFAULT_DATE), + logical_date=current_time, + data_interval=(current_time, current_time), + run_type=DagRunType.MANUAL, state=State.RUNNING, - start_date=DEFAULT_DATE, + start_date=current_time, **triggered_by_kwargs, ) @@ -408,7 +420,9 @@ def test_on_kill(self): triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dag.create_dagrun( run_id="test", + logical_date=DEFAULT_DATE, data_interval=(DEFAULT_DATE, DEFAULT_DATE), + run_type=DagRunType.MANUAL, state=State.RUNNING, start_date=DEFAULT_DATE, **triggered_by_kwargs, @@ -463,7 +477,9 @@ def test_parsing_context(self): dag.create_dagrun( run_id="test", + logical_date=DEFAULT_DATE, data_interval=(DEFAULT_DATE, DEFAULT_DATE), + run_type=DagRunType.MANUAL, state=State.RUNNING, start_date=DEFAULT_DATE, **triggered_by_kwargs, diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index 19a432bb73767..454af48d66763 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -36,7 +36,6 @@ from airflow.executors import executor_loader from airflow.jobs.job import Job from airflow.jobs.triggerer_job_runner import TriggererJobRunner -from airflow.models.dag import DAG from airflow.models.dagrun import DagRun from airflow.models.taskinstance import TaskInstance from airflow.models.trigger import Trigger @@ -56,10 +55,6 @@ from airflow.utils.types import DagRunType from tests_common.test_utils.config import conf_vars -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS - -if AIRFLOW_V_3_0_PLUS: - from airflow.utils.types import DagRunTriggeredByType pytestmark = pytest.mark.db_test @@ -91,24 +86,17 @@ def test_default_task_logging_setup(self): handler = handlers[0] assert handler.name == FILE_TASK_HANDLER - def test_file_task_handler_when_ti_value_is_invalid(self): + def test_file_task_handler_when_ti_value_is_invalid(self, dag_maker): def task_callable(ti): ti.log.info("test") - dag = DAG("dag_for_testing_file_task_handler", schedule=None, start_date=DEFAULT_DATE) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - dagrun = dag.create_dagrun( - run_type=DagRunType.MANUAL, - state=State.RUNNING, - logical_date=DEFAULT_DATE, - data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE), - **triggered_by_kwargs, - ) - task = PythonOperator( - task_id="task_for_testing_file_log_handler", - dag=dag, - python_callable=task_callable, - ) + with dag_maker("dag_for_testing_file_task_handler", schedule=None): + task = PythonOperator( + task_id="task_for_testing_file_log_handler", + python_callable=task_callable, + ) + + dagrun = dag_maker.create_dagrun() ti = TaskInstance(task=task, run_id=dagrun.run_id) logger = ti.log @@ -146,26 +134,22 @@ def task_callable(ti): # Remove the generated tmp log file. os.remove(log_filename) - def test_file_task_handler(self): + def test_file_task_handler(self, dag_maker, session): def task_callable(ti): ti.log.info("test") - dag = DAG("dag_for_testing_file_task_handler", schedule=None, start_date=DEFAULT_DATE) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - dagrun = dag.create_dagrun( - run_type=DagRunType.MANUAL, - state=State.RUNNING, - logical_date=DEFAULT_DATE, - data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE), - **triggered_by_kwargs, - ) - task = PythonOperator( - task_id="task_for_testing_file_log_handler", - dag=dag, - python_callable=task_callable, - ) - ti = TaskInstance(task=task, run_id=dagrun.run_id) + with dag_maker("dag_for_testing_file_task_handler", schedule=None, session=session): + PythonOperator( + task_id="task_for_testing_file_log_handler", + python_callable=task_callable, + ) + + dagrun = dag_maker.create_dagrun() + + (ti,) = dagrun.get_task_instances() ti.try_number += 1 + session.merge(ti) + session.flush() logger = ti.log ti.log.disabled = False @@ -203,24 +187,16 @@ def task_callable(ti): # Remove the generated tmp log file. os.remove(log_filename) - def test_file_task_handler_running(self): + def test_file_task_handler_running(self, dag_maker): def task_callable(ti): ti.log.info("test") - dag = DAG("dag_for_testing_file_task_handler", schedule=None, start_date=DEFAULT_DATE) - task = PythonOperator( - task_id="task_for_testing_file_log_handler", - python_callable=task_callable, - dag=dag, - ) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - dagrun = dag.create_dagrun( - run_type=DagRunType.MANUAL, - state=State.RUNNING, - logical_date=DEFAULT_DATE, - data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE), - **triggered_by_kwargs, - ) + with dag_maker("dag_for_testing_file_task_handler", schedule=None): + task = PythonOperator( + task_id="task_for_testing_file_log_handler", + python_callable=task_callable, + ) + dagrun = dag_maker.create_dagrun() ti = TaskInstance(task=task, run_id=dagrun.run_id) ti.try_number = 2 @@ -256,7 +232,7 @@ def task_callable(ti): # Remove the generated tmp log file. os.remove(log_filename) - def test_file_task_handler_rotate_size_limit(self): + def test_file_task_handler_rotate_size_limit(self, dag_maker): def reset_log_config(update_conf): import logging.config @@ -270,20 +246,12 @@ def task_callable(ti): max_bytes_size = 60000 update_conf = {"handlers": {"task": {"max_bytes": max_bytes_size, "backup_count": 1}}} reset_log_config(update_conf) - dag = DAG("dag_for_testing_file_task_handler_rotate_size_limit", start_date=DEFAULT_DATE) - task = PythonOperator( - task_id="task_for_testing_file_log_handler_rotate_size_limit", - python_callable=task_callable, - dag=dag, - ) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - dagrun = dag.create_dagrun( - run_type=DagRunType.MANUAL, - state=State.RUNNING, - logical_date=DEFAULT_DATE, - data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE), - **triggered_by_kwargs, - ) + with dag_maker("dag_for_testing_file_task_handler_rotate_size_limit"): + task = PythonOperator( + task_id="task_for_testing_file_log_handler_rotate_size_limit", + python_callable=task_callable, + ) + dagrun = dag_maker.create_dagrun() ti = TaskInstance(task=task, run_id=dagrun.run_id) ti.try_number = 1 diff --git a/tests/utils/test_sqlalchemy.py b/tests/utils/test_sqlalchemy.py index 0e791de19552d..5b30595e4a305 100644 --- a/tests/utils/test_sqlalchemy.py +++ b/tests/utils/test_sqlalchemy.py @@ -42,6 +42,7 @@ ) from airflow.utils.state import State from airflow.utils.timezone import utcnow +from airflow.utils.types import DagRunType from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS @@ -81,6 +82,7 @@ def test_utc_transformations(self): triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} run = dag.create_dagrun( run_id=iso_date, + run_type=DagRunType.MANUAL, state=State.NONE, logical_date=logical_date, start_date=start_date, @@ -115,6 +117,7 @@ def test_process_bind_param_naive(self): with pytest.raises((ValueError, StatementError)): dag.create_dagrun( run_id=start_date.isoformat, + run_type=DagRunType.MANUAL, state=State.NONE, logical_date=start_date, start_date=start_date, diff --git a/tests/utils/test_state.py b/tests/utils/test_state.py index 5ad9f4a7044ad..e99bc0bd08387 100644 --- a/tests/utils/test_state.py +++ b/tests/utils/test_state.py @@ -44,6 +44,11 @@ def test_dagrun_state_enum_escape(): dag = DAG(dag_id="test_dagrun_state_enum_escape", schedule=timedelta(days=1), start_date=DEFAULT_DATE) triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dag.create_dagrun( + run_id=dag.timetable.generate_run_id( + run_type=DagRunType.SCHEDULED, + logical_date=DEFAULT_DATE, + data_interval=dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE), + ), run_type=DagRunType.SCHEDULED, state=DagRunState.QUEUED, logical_date=DEFAULT_DATE, diff --git a/tests/utils/test_types.py b/tests/utils/test_types.py index c66c58079880a..4a6831f40354d 100644 --- a/tests/utils/test_types.py +++ b/tests/utils/test_types.py @@ -20,60 +20,42 @@ import pytest -from airflow.models.dag import DAG from airflow.models.dagrun import DagRun -from airflow.utils.session import create_session from airflow.utils.state import State from airflow.utils.types import DagRunType -from tests.models import DEFAULT_DATE -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS - -if AIRFLOW_V_3_0_PLUS: - from airflow.utils.types import DagRunTriggeredByType - pytestmark = pytest.mark.db_test -def test_runtype_enum_escape(): +def test_runtype_enum_escape(dag_maker, session): """ Make sure DagRunType.SCHEDULE is converted to string 'scheduled' when referenced in DB query """ - with create_session() as session: - dag = DAG(dag_id="test_enum_dags", schedule=timedelta(days=1), start_date=DEFAULT_DATE) - data_interval = dag.timetable.infer_manual_data_interval(run_after=DEFAULT_DATE) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - dag.create_dagrun( - run_type=DagRunType.SCHEDULED, - state=State.RUNNING, - logical_date=DEFAULT_DATE, - start_date=DEFAULT_DATE, - session=session, - data_interval=data_interval, - **triggered_by_kwargs, - ) - - query = session.query( - DagRun.dag_id, - DagRun.state, - DagRun.run_type, - ).filter( - DagRun.dag_id == dag.dag_id, - # make sure enum value can be used in filter queries - DagRun.run_type == DagRunType.SCHEDULED, - ) - assert str(query.statement.compile(compile_kwargs={"literal_binds": True})) == ( - "SELECT dag_run.dag_id, dag_run.state, dag_run.run_type \n" - "FROM dag_run \n" - "WHERE dag_run.dag_id = 'test_enum_dags' AND dag_run.run_type = 'scheduled'" - ) - - rows = query.all() - assert len(rows) == 1 - assert rows[0].dag_id == dag.dag_id - assert rows[0].state == State.RUNNING - # make sure value in db is stored as `scheduled`, not `DagRunType.SCHEDULED` - assert rows[0].run_type == "scheduled" - - session.rollback() + with dag_maker(dag_id="test_enum_dags", schedule=timedelta(days=1), session=session): + pass + dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED) + + query = session.query( + DagRun.dag_id, + DagRun.state, + DagRun.run_type, + ).filter( + DagRun.dag_id == "test_enum_dags", + # make sure enum value can be used in filter queries + DagRun.run_type == DagRunType.SCHEDULED, + ) + assert str(query.statement.compile(compile_kwargs={"literal_binds": True})) == ( + "SELECT dag_run.dag_id, dag_run.state, dag_run.run_type \n" + "FROM dag_run \n" + "WHERE dag_run.dag_id = 'test_enum_dags' AND dag_run.run_type = 'scheduled'" + ) + + rows = query.all() + assert len(rows) == 1 + assert rows[0].dag_id == "test_enum_dags" + assert rows[0].state == State.RUNNING + # make sure value in db is stored as `scheduled`, not `DagRunType.SCHEDULED` + assert rows[0].run_type == "scheduled" + + session.rollback() diff --git a/tests/www/test_utils.py b/tests/www/test_utils.py index c87848fdf0e72..2e652f7811a63 100644 --- a/tests/www/test_utils.py +++ b/tests/www/test_utils.py @@ -616,16 +616,10 @@ def test_dag_run_custom_sqla_interface_delete_no_collateral_damage(dag_maker, se interface = DagRunCustomSQLAInterface(obj=DagRun, session=session) dag_ids = (f"test_dag_{x}" for x in range(1, 4)) dates = (pendulum.datetime(2023, 1, x) for x in range(1, 4)) - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} for dag_id, date in itertools.product(dag_ids, dates): - with dag_maker(dag_id=dag_id) as dag: - dag.create_dagrun( - logical_date=date, - state="running", - run_type="scheduled", - data_interval=(date, date), - **triggered_by_kwargs, - ) + with dag_maker(dag_id=dag_id): + pass + dag_maker.create_dagrun(logical_date=date, state="running", run_type="scheduled") dag_runs = session.query(DagRun).all() assert len(dag_runs) == 9 assert len(set(x.run_id for x in dag_runs)) == 3 diff --git a/tests/www/views/test_views.py b/tests/www/views/test_views.py index c138e24125a90..6ac7039cebd8c 100644 --- a/tests/www/views/test_views.py +++ b/tests/www/views/test_views.py @@ -43,12 +43,8 @@ from tests_common.test_utils.config import conf_vars from tests_common.test_utils.mock_plugins import mock_plugin_manager -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS from tests_common.test_utils.www import check_content_in_response, check_content_not_in_response -if AIRFLOW_V_3_0_PLUS: - from airflow.utils.types import DagRunTriggeredByType - pytestmark = pytest.mark.db_test @@ -312,13 +308,12 @@ def test_app(): return app.create_app(testing=True) -def test_mark_task_instance_state(test_app): +def test_mark_task_instance_state(test_app, dag_maker): """ Test that _mark_task_instance_state() does all three things: - Marks the given TaskInstance as SUCCESS; - Clears downstream TaskInstances in FAILED/UPSTREAM_FAILED state; """ - from airflow.models.dag import DAG from airflow.models.dagbag import DagBag from airflow.models.taskinstance import TaskInstance from airflow.operators.empty import EmptyOperator @@ -332,7 +327,7 @@ def test_mark_task_instance_state(test_app): clear_db_runs() start_date = datetime(2020, 1, 1) - with DAG("test_mark_task_instance_state", start_date=start_date, schedule="0 0 * * *") as dag: + with dag_maker("test_mark_task_instance_state", start_date=start_date, schedule="0 0 * * *") as dag: task_1 = EmptyOperator(task_id="task_1") task_2 = EmptyOperator(task_id="task_2") task_3 = EmptyOperator(task_id="task_3") @@ -341,15 +336,7 @@ def test_mark_task_instance_state(test_app): task_1 >> [task_2, task_3, task_4, task_5] - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - dagrun = dag.create_dagrun( - start_date=start_date, - logical_date=start_date, - data_interval=(start_date, start_date), - state=State.FAILED, - run_type=DagRunType.SCHEDULED, - **triggered_by_kwargs, - ) + dagrun = dag_maker.create_dagrun(state=State.FAILED, run_type=DagRunType.SCHEDULED) def get_task_instance(session, task): return ( @@ -405,14 +392,13 @@ def get_task_instance(session, task): assert dagrun.get_state() == State.QUEUED -def test_mark_task_group_state(test_app): +def test_mark_task_group_state(test_app, dag_maker): """ Test that _mark_task_group_state() does all three things: - Marks the given TaskGroup as SUCCESS; - Clears downstream TaskInstances in FAILED/UPSTREAM_FAILED state; - Set DagRun to QUEUED. """ - from airflow.models.dag import DAG from airflow.models.dagbag import DagBag from airflow.models.taskinstance import TaskInstance from airflow.operators.empty import EmptyOperator @@ -426,7 +412,7 @@ def test_mark_task_group_state(test_app): clear_db_runs() start_date = datetime(2020, 1, 1) - with DAG("test_mark_task_group_state", start_date=start_date, schedule="0 0 * * *") as dag: + with dag_maker("test_mark_task_group_state", start_date=start_date, schedule="0 0 * * *") as dag: start = EmptyOperator(task_id="start") with TaskGroup("section_1", tooltip="Tasks for section_1") as section_1: @@ -444,15 +430,7 @@ def test_mark_task_group_state(test_app): start >> section_1 >> [task_4, task_5, task_6, task_7, task_8] - triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} - dagrun = dag.create_dagrun( - start_date=start_date, - logical_date=start_date, - data_interval=(start_date, start_date), - state=State.FAILED, - run_type=DagRunType.SCHEDULED, - **triggered_by_kwargs, - ) + dagrun = dag_maker.create_dagrun(state=State.FAILED, run_type=DagRunType.SCHEDULED) def get_task_instance(session, task): return ( diff --git a/tests/www/views/test_views_acl.py b/tests/www/views/test_views_acl.py index 21379550dd1fd..c28026973eb52 100644 --- a/tests/www/views/test_views_acl.py +++ b/tests/www/views/test_views_acl.py @@ -159,6 +159,7 @@ def _init_dagruns(acl_app, _reset_dagruns): **triggered_by_kwargs, ) acl_app.dag_bag.get_dag("example_python_operator").create_dagrun( + run_id=DEFAULT_RUN_ID, run_type=DagRunType.SCHEDULED, logical_date=DEFAULT_DATE, start_date=timezone.utcnow(), diff --git a/tests/www/views/test_views_dagrun.py b/tests/www/views/test_views_dagrun.py index c7f860d624642..12f375b870e9e 100644 --- a/tests/www/views/test_views_dagrun.py +++ b/tests/www/views/test_views_dagrun.py @@ -23,6 +23,7 @@ from airflow.security import permissions from airflow.utils import timezone from airflow.utils.session import create_session +from airflow.utils.types import DagRunType from airflow.www.views import DagRunModelView from providers.tests.fab.auth_manager.api_endpoints.api_connexion_utils import ( @@ -146,6 +147,7 @@ def running_dag_run(session): logical_date=logical_date, data_interval=(logical_date, logical_date), run_id="test_dag_runs_action", + run_type=DagRunType.MANUAL, session=session, **triggered_by_kwargs, ) @@ -169,6 +171,7 @@ def completed_dag_run_with_missing_task(session): logical_date=logical_date, data_interval=(logical_date, logical_date), run_id="test_dag_runs_action", + run_type=DagRunType.MANUAL, session=session, **triggered_by_kwargs, ) @@ -324,6 +327,7 @@ def dag_run_with_all_done_task(session): logical_date=logical_date, data_interval=(logical_date, logical_date), run_id="test_dagrun_failed", + run_type=DagRunType.MANUAL, session=session, **triggered_by_kwargs, ) diff --git a/tests/www/views/test_views_decorators.py b/tests/www/views/test_views_decorators.py index 6c0a0a7d78172..02be0ebd3681c 100644 --- a/tests/www/views/test_views_decorators.py +++ b/tests/www/views/test_views_decorators.py @@ -62,6 +62,7 @@ def xcom_dag(dagbag): def dagruns(bash_dag, xcom_dag): triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} bash_dagrun = bash_dag.create_dagrun( + run_id="test_bash", run_type=DagRunType.SCHEDULED, logical_date=EXAMPLE_DAG_DEFAULT_DATE, data_interval=(EXAMPLE_DAG_DEFAULT_DATE, EXAMPLE_DAG_DEFAULT_DATE), @@ -71,6 +72,7 @@ def dagruns(bash_dag, xcom_dag): ) xcom_dagrun = xcom_dag.create_dagrun( + run_id="test_xcom", run_type=DagRunType.SCHEDULED, logical_date=EXAMPLE_DAG_DEFAULT_DATE, data_interval=(EXAMPLE_DAG_DEFAULT_DATE, EXAMPLE_DAG_DEFAULT_DATE), diff --git a/tests/www/views/test_views_extra_links.py b/tests/www/views/test_views_extra_links.py index 6ea977da7fa4c..8ae2c63b818e0 100644 --- a/tests/www/views/test_views_extra_links.py +++ b/tests/www/views/test_views_extra_links.py @@ -90,6 +90,7 @@ def create_dag_run(dag): def _create_dag_run(*, logical_date, session): triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} return dag.create_dagrun( + run_id=f"manual__{logical_date.isoformat()}", state=DagRunState.RUNNING, logical_date=logical_date, data_interval=(logical_date, logical_date), diff --git a/tests/www/views/test_views_log.py b/tests/www/views/test_views_log.py index 735c9a569e093..ae264d3f63829 100644 --- a/tests/www/views/test_views_log.py +++ b/tests/www/views/test_views_log.py @@ -160,6 +160,7 @@ def tis(dags, session): dag, dag_removed = dags triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dagrun = dag.create_dagrun( + run_id=f"scheduled__{DEFAULT_DATE.isoformat()}", run_type=DagRunType.SCHEDULED, logical_date=DEFAULT_DATE, data_interval=(DEFAULT_DATE, DEFAULT_DATE), @@ -172,6 +173,7 @@ def tis(dags, session): ti.try_number = 1 ti.hostname = "localhost" dagrun_removed = dag_removed.create_dagrun( + run_id=f"scheduled__{DEFAULT_DATE.isoformat()}", run_type=DagRunType.SCHEDULED, logical_date=DEFAULT_DATE, data_interval=(DEFAULT_DATE, DEFAULT_DATE), diff --git a/tests/www/views/test_views_rendered.py b/tests/www/views/test_views_rendered.py index 0af32512c4d33..5f4b8063d0075 100644 --- a/tests/www/views/test_views_rendered.py +++ b/tests/www/views/test_views_rendered.py @@ -147,6 +147,7 @@ def create_dag_run(dag, task1, task2, task3, task4, task_secret): def _create_dag_run(*, logical_date, session): triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dag_run = dag.create_dagrun( + run_id="test", state=DagRunState.RUNNING, logical_date=logical_date, data_interval=(logical_date, logical_date), @@ -343,6 +344,7 @@ def test_rendered_task_detail_env_secret(patch_app, admin_client, request, env, with create_session() as session: triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} dag.create_dagrun( + run_id="test", state=DagRunState.RUNNING, logical_date=DEFAULT_DATE, data_interval=(DEFAULT_DATE, DEFAULT_DATE), diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py index 10f42aca6c4f2..44c4316058171 100644 --- a/tests/www/views/test_views_tasks.py +++ b/tests/www/views/test_views_tasks.py @@ -397,6 +397,7 @@ def test_rendered_k8s_without_k8s(admin_client): def test_tree_trigger_origin_tree_view(app, admin_client): triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} app.dag_bag.get_dag("example_bash_operator").create_dagrun( + run_id="test", run_type=DagRunType.SCHEDULED, logical_date=DEFAULT_DATE, data_interval=(DEFAULT_DATE, DEFAULT_DATE), @@ -415,6 +416,7 @@ def test_tree_trigger_origin_tree_view(app, admin_client): def test_graph_trigger_origin_grid_view(app, admin_client): triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} app.dag_bag.get_dag("example_bash_operator").create_dagrun( + run_id="test", run_type=DagRunType.SCHEDULED, logical_date=DEFAULT_DATE, data_interval=(DEFAULT_DATE, DEFAULT_DATE), @@ -433,6 +435,7 @@ def test_graph_trigger_origin_grid_view(app, admin_client): def test_gantt_trigger_origin_grid_view(app, admin_client): triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} app.dag_bag.get_dag("example_bash_operator").create_dagrun( + run_id="test", run_type=DagRunType.SCHEDULED, logical_date=DEFAULT_DATE, data_interval=(DEFAULT_DATE, DEFAULT_DATE), diff --git a/tests_common/pytest_plugin.py b/tests_common/pytest_plugin.py index b4d3c089a3740..0516f8431fe76 100644 --- a/tests_common/pytest_plugin.py +++ b/tests_common/pytest_plugin.py @@ -889,31 +889,42 @@ def create_dagrun(self, *, logical_date=None, **kwargs): "session": self.session, **kwargs, } - # Need to provide run_id if the user does not either provide one - # explicitly, or pass run_type for inference in dag.create_dagrun(). - if "run_id" not in kwargs and "run_type" not in kwargs: - kwargs["run_id"] = "test" - if "run_type" not in kwargs: - kwargs["run_type"] = DagRunType.from_run_id(kwargs["run_id"]) + run_type = kwargs.get("run_type", DagRunType.MANUAL) + if not isinstance(run_type, DagRunType): + run_type = DagRunType(run_type) if logical_date is None: - if kwargs["run_type"] == DagRunType.MANUAL: + if run_type == DagRunType.MANUAL: logical_date = self.start_date else: logical_date = dag.next_dagrun_info(None).logical_date logical_date = timezone.coerce_datetime(logical_date) - if "data_interval" not in kwargs: - if kwargs["run_type"] == DagRunType.MANUAL: + try: + data_interval = kwargs["data_interval"] + except KeyError: + if run_type == DagRunType.MANUAL: data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date) else: data_interval = dag.infer_automated_data_interval(logical_date) kwargs["data_interval"] = data_interval + if "run_id" not in kwargs: + if "run_type" not in kwargs: + kwargs["run_id"] = "test" + else: + kwargs["run_id"] = dag.timetable.generate_run_id( + run_type=run_type, + logical_date=logical_date, + data_interval=data_interval, + ) + kwargs["run_type"] = run_type + if AIRFLOW_V_3_0_PLUS: kwargs.setdefault("triggered_by", DagRunTriggeredByType.TEST) kwargs["logical_date"] = logical_date + kwargs["dag_version"] = None else: kwargs.pop("triggered_by", None) kwargs["execution_date"] = logical_date