Skip to content

Commit

Permalink
Refactor DAG.create_dagrun() arguments
Browse files Browse the repository at this point in the history
This aims to make the interface more straightforward, removing auto
inference logic from the function. Most importantly, it is now entirely
the caller site's responsibility to provide valid run_id and run_type
values, instead of the function automatically inferring one from the
other under certain conditions.

The main goal is to make changes simpler when we make logical date an
optional (nullable) value. run_id generation is currently very heavily
based on the logical date, and will need to be changed a bit when
logical date is None. Removing logic should help us change the run_id
generation logic easier.
  • Loading branch information
uranusjr committed Jan 3, 2025
1 parent 83604a0 commit 827cc83
Show file tree
Hide file tree
Showing 10 changed files with 143 additions and 158 deletions.
7 changes: 4 additions & 3 deletions airflow/api/common/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,13 @@ def _trigger_dag(
dag_run = dag.create_dagrun(
run_id=run_id,
logical_date=logical_date,
state=DagRunState.QUEUED,
data_interval=data_interval,
conf=run_conf,
run_type=DagRunType.MANUAL,
triggered_by=triggered_by,
external_trigger=True,
dag_version=dag_version,
data_interval=data_interval,
triggered_by=triggered_by,
state=DagRunState.QUEUED,
session=session,
)

Expand Down
9 changes: 4 additions & 5 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,18 +347,17 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
)
else:
data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date)
dag_version = DagVersion.get_latest_version(dag.dag_id)
dag_run = dag.create_dagrun(
run_type=DagRunType.MANUAL,
run_id=run_id,
logical_date=logical_date,
data_interval=data_interval,
state=DagRunState.QUEUED,
conf=post_body.get("conf"),
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.REST_API,
external_trigger=True,
dag_version=dag_version,
dag_version=DagVersion.get_latest_version(dag.dag_id),
state=DagRunState.QUEUED,
session=session,
triggered_by=DagRunTriggeredByType.REST_API,
)
dag_run_note = post_body.get("note")
if dag_run_note:
Expand Down
20 changes: 14 additions & 6 deletions airflow/api_fastapi/core_api/routes/public/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,6 @@ def trigger_dag_run(
f"DAG with dag_id: '{dag_id}' has import errors and cannot be triggered",
)

run_id = body.dag_run_id
logical_date = pendulum.instance(body.logical_date)

try:
Expand All @@ -355,18 +354,27 @@ def trigger_dag_run(
)
else:
data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date)
dag_version = DagVersion.get_latest_version(dag.dag_id)

if body.dag_run_id:
run_id = body.dag_run_id
else:
run_id = dag.timetable.generate_run_id(
run_type=DagRunType.MANUAL,
logical_date=logical_date,
data_interval=data_interval,
)

dag_run = dag.create_dagrun(
run_type=DagRunType.MANUAL,
run_id=run_id,
logical_date=logical_date,
data_interval=data_interval,
state=DagRunState.QUEUED,
conf=body.conf,
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.REST_API,
external_trigger=True,
dag_version=dag_version,
dag_version=DagVersion.get_latest_version(dag.dag_id),
state=DagRunState.QUEUED,
session=session,
triggered_by=DagRunTriggeredByType.REST_API,
)
dag_run_note = body.note
if dag_run_note:
Expand Down
10 changes: 6 additions & 4 deletions airflow/cli/commands/remote_commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
from airflow.utils.session import NEW_SESSION, create_session, provide_session
from airflow.utils.state import DagRunState
from airflow.utils.task_instance_session import set_current_task_instance_session
from airflow.utils.types import DagRunTriggeredByType
from airflow.utils.types import DagRunTriggeredByType, DagRunType

if TYPE_CHECKING:
from sqlalchemy.orm.session import Session
Expand Down Expand Up @@ -179,12 +179,14 @@ def _get_dag_run(
return dag_run, True
elif create_if_necessary == "db":
dag_run = dag.create_dagrun(
state=DagRunState.QUEUED,
logical_date=dag_run_logical_date,
run_id=_generate_temporary_run_id(),
logical_date=dag_run_logical_date,
data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date),
session=session,
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.CLI,
dag_version=None,
state=DagRunState.QUEUED,
session=session,
)
return dag_run, True
raise ValueError(f"unknown create_if_necessary value: {create_if_necessary!r}")
Expand Down
39 changes: 20 additions & 19 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1346,15 +1346,19 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
try:
dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
run_id=dag.timetable.generate_run_id(
run_type=DagRunType.SCHEDULED,
logical_date=dag_model.next_dagrun,
data_interval=data_interval,
),
logical_date=dag_model.next_dagrun,
state=DagRunState.QUEUED,
data_interval=data_interval,
external_trigger=False,
session=session,
run_type=DagRunType.SCHEDULED,
triggered_by=DagRunTriggeredByType.TIMETABLE,
dag_version=latest_dag_version,
state=DagRunState.QUEUED,
creating_job_id=self.job.id,
triggered_by=DagRunTriggeredByType.TIMETABLE,
session=session,
)
active_runs_of_dags[dag.dag_id] += 1
# Exceptions like ValueError, ParamValidationError, etc. are raised by
Expand Down Expand Up @@ -1450,25 +1454,22 @@ def _create_dag_runs_asset_triggered(
).all()

data_interval = dag.timetable.data_interval_for_events(logical_date, asset_events)
run_id = dag.timetable.generate_run_id(
run_type=DagRunType.ASSET_TRIGGERED,
logical_date=logical_date,
data_interval=data_interval,
session=session,
events=asset_events,
)

dag_run = dag.create_dagrun(
run_id=run_id,
run_type=DagRunType.ASSET_TRIGGERED,
run_id=dag.timetable.generate_run_id(
run_type=DagRunType.ASSET_TRIGGERED,
logical_date=logical_date,
data_interval=data_interval,
session=session,
events=asset_events,
),
logical_date=logical_date,
data_interval=data_interval,
state=DagRunState.QUEUED,
external_trigger=False,
session=session,
run_type=DagRunType.ASSET_TRIGGERED,
triggered_by=DagRunTriggeredByType.ASSET,
dag_version=latest_dag_version,
state=DagRunState.QUEUED,
creating_job_id=self.job.id,
triggered_by=DagRunTriggeredByType.ASSET,
session=session,
)
Stats.incr("asset.triggered_dagruns")
dag_run.consumed_asset_events.extend(asset_events)
Expand Down
23 changes: 14 additions & 9 deletions airflow/models/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
if TYPE_CHECKING:
from datetime import datetime

from airflow.models.dag import DAG
from airflow.timetables.base import DagRunInfo

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -157,8 +159,8 @@ def validate_sort_ordinal(self, key, val):

def _create_backfill_dag_run(
*,
dag,
info,
dag: DAG,
info: DagRunInfo,
reprocess_behavior: ReprocessBehavior,
backfill_id,
dag_run_conf,
Expand Down Expand Up @@ -203,18 +205,21 @@ def _create_backfill_dag_run(
return
dag_version = DagVersion.get_latest_version(dag.dag_id, session=session)
dr = dag.create_dagrun(
triggered_by=DagRunTriggeredByType.BACKFILL,
run_id=dag.timetable.generate_run_id(
run_type=DagRunType.BACKFILL_JOB,
logical_date=info.logical_date,
data_interval=info.data_interval,
),
logical_date=info.logical_date,
data_interval=info.data_interval,
start_date=timezone.utcnow(),
state=DagRunState.QUEUED,
external_trigger=False,
conf=dag_run_conf,
run_type=DagRunType.BACKFILL_JOB,
creating_job_id=None,
session=session,
backfill_id=backfill_id,
triggered_by=DagRunTriggeredByType.BACKFILL,
dag_version=dag_version,
state=DagRunState.QUEUED,
start_date=timezone.utcnow(),
backfill_id=backfill_id,
session=session,
)
session.add(
BackfillDagRun(
Expand Down
Loading

0 comments on commit 827cc83

Please sign in to comment.