Skip to content

Commit

Permalink
Refactor DAG.create_dagrun() arguments (apache#45370)
Browse files Browse the repository at this point in the history
  • Loading branch information
uranusjr authored and HariGS-DB committed Jan 16, 2025
1 parent 82ef805 commit 4bdadd5
Show file tree
Hide file tree
Showing 40 changed files with 784 additions and 803 deletions.
7 changes: 4 additions & 3 deletions airflow/api/common/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,13 @@ def _trigger_dag(
dag_run = dag.create_dagrun(
run_id=run_id,
logical_date=logical_date,
state=DagRunState.QUEUED,
data_interval=data_interval,
conf=run_conf,
run_type=DagRunType.MANUAL,
triggered_by=triggered_by,
external_trigger=True,
dag_version=dag_version,
data_interval=data_interval,
triggered_by=triggered_by,
state=DagRunState.QUEUED,
session=session,
)

Expand Down
9 changes: 4 additions & 5 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,18 +347,17 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
)
else:
data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date)
dag_version = DagVersion.get_latest_version(dag.dag_id)
dag_run = dag.create_dagrun(
run_type=DagRunType.MANUAL,
run_id=run_id,
logical_date=logical_date,
data_interval=data_interval,
state=DagRunState.QUEUED,
conf=post_body.get("conf"),
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.REST_API,
external_trigger=True,
dag_version=dag_version,
dag_version=DagVersion.get_latest_version(dag.dag_id),
state=DagRunState.QUEUED,
session=session,
triggered_by=DagRunTriggeredByType.REST_API,
)
dag_run_note = post_body.get("note")
if dag_run_note:
Expand Down
20 changes: 14 additions & 6 deletions airflow/api_fastapi/core_api/routes/public/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,6 @@ def trigger_dag_run(
f"DAG with dag_id: '{dag_id}' has import errors and cannot be triggered",
)

run_id = body.dag_run_id
logical_date = pendulum.instance(body.logical_date)

try:
Expand All @@ -355,18 +354,27 @@ def trigger_dag_run(
)
else:
data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date)
dag_version = DagVersion.get_latest_version(dag.dag_id)

if body.dag_run_id:
run_id = body.dag_run_id
else:
run_id = dag.timetable.generate_run_id(
run_type=DagRunType.MANUAL,
logical_date=logical_date,
data_interval=data_interval,
)

dag_run = dag.create_dagrun(
run_type=DagRunType.MANUAL,
run_id=run_id,
logical_date=logical_date,
data_interval=data_interval,
state=DagRunState.QUEUED,
conf=body.conf,
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.REST_API,
external_trigger=True,
dag_version=dag_version,
dag_version=DagVersion.get_latest_version(dag.dag_id),
state=DagRunState.QUEUED,
session=session,
triggered_by=DagRunTriggeredByType.REST_API,
)
dag_run_note = body.note
if dag_run_note:
Expand Down
10 changes: 6 additions & 4 deletions airflow/cli/commands/remote_commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
from airflow.utils.session import NEW_SESSION, create_session, provide_session
from airflow.utils.state import DagRunState
from airflow.utils.task_instance_session import set_current_task_instance_session
from airflow.utils.types import DagRunTriggeredByType
from airflow.utils.types import DagRunTriggeredByType, DagRunType

if TYPE_CHECKING:
from typing import Literal
Expand Down Expand Up @@ -180,12 +180,14 @@ def _get_dag_run(
return dag_run, True
elif create_if_necessary == "db":
dag_run = dag.create_dagrun(
state=DagRunState.QUEUED,
logical_date=dag_run_logical_date,
run_id=_generate_temporary_run_id(),
logical_date=dag_run_logical_date,
data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date),
session=session,
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.CLI,
dag_version=None,
state=DagRunState.QUEUED,
session=session,
)
return dag_run, True
raise ValueError(f"unknown create_if_necessary value: {create_if_necessary!r}")
Expand Down
39 changes: 20 additions & 19 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1344,15 +1344,19 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
try:
dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
run_id=dag.timetable.generate_run_id(
run_type=DagRunType.SCHEDULED,
logical_date=dag_model.next_dagrun,
data_interval=data_interval,
),
logical_date=dag_model.next_dagrun,
state=DagRunState.QUEUED,
data_interval=data_interval,
external_trigger=False,
session=session,
run_type=DagRunType.SCHEDULED,
triggered_by=DagRunTriggeredByType.TIMETABLE,
dag_version=latest_dag_version,
state=DagRunState.QUEUED,
creating_job_id=self.job.id,
triggered_by=DagRunTriggeredByType.TIMETABLE,
session=session,
)
active_runs_of_dags[dag.dag_id] += 1
# Exceptions like ValueError, ParamValidationError, etc. are raised by
Expand Down Expand Up @@ -1448,25 +1452,22 @@ def _create_dag_runs_asset_triggered(
).all()

data_interval = dag.timetable.data_interval_for_events(logical_date, asset_events)
run_id = dag.timetable.generate_run_id(
run_type=DagRunType.ASSET_TRIGGERED,
logical_date=logical_date,
data_interval=data_interval,
session=session,
events=asset_events,
)

dag_run = dag.create_dagrun(
run_id=run_id,
run_type=DagRunType.ASSET_TRIGGERED,
run_id=dag.timetable.generate_run_id(
run_type=DagRunType.ASSET_TRIGGERED,
logical_date=logical_date,
data_interval=data_interval,
session=session,
events=asset_events,
),
logical_date=logical_date,
data_interval=data_interval,
state=DagRunState.QUEUED,
external_trigger=False,
session=session,
run_type=DagRunType.ASSET_TRIGGERED,
triggered_by=DagRunTriggeredByType.ASSET,
dag_version=latest_dag_version,
state=DagRunState.QUEUED,
creating_job_id=self.job.id,
triggered_by=DagRunTriggeredByType.ASSET,
session=session,
)
Stats.incr("asset.triggered_dagruns")
dag_run.consumed_asset_events.extend(asset_events)
Expand Down
23 changes: 14 additions & 9 deletions airflow/models/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
if TYPE_CHECKING:
from datetime import datetime

from airflow.models.dag import DAG
from airflow.timetables.base import DagRunInfo

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -223,8 +225,8 @@ def _do_dry_run(*, dag_id, from_date, to_date, reverse, reprocess_behavior, sess

def _create_backfill_dag_run(
*,
dag,
info,
dag: DAG,
info: DagRunInfo,
reprocess_behavior: ReprocessBehavior,
backfill_id,
dag_run_conf,
Expand Down Expand Up @@ -257,18 +259,21 @@ def _create_backfill_dag_run(
return
dag_version = DagVersion.get_latest_version(dag.dag_id, session=session)
dr = dag.create_dagrun(
triggered_by=DagRunTriggeredByType.BACKFILL,
run_id=dag.timetable.generate_run_id(
run_type=DagRunType.BACKFILL_JOB,
logical_date=info.logical_date,
data_interval=info.data_interval,
),
logical_date=info.logical_date,
data_interval=info.data_interval,
start_date=timezone.utcnow(),
state=DagRunState.QUEUED,
external_trigger=False,
conf=dag_run_conf,
run_type=DagRunType.BACKFILL_JOB,
creating_job_id=None,
session=session,
backfill_id=backfill_id,
triggered_by=DagRunTriggeredByType.BACKFILL,
dag_version=dag_version,
state=DagRunState.QUEUED,
start_date=timezone.utcnow(),
backfill_id=backfill_id,
session=session,
)
session.add(
BackfillDagRun(
Expand Down
Loading

0 comments on commit 4bdadd5

Please sign in to comment.