diff --git a/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow/api_fastapi/core_api/datamodels/dag_run.py index 78e0254f62240..18be129a1952c 100644 --- a/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -20,11 +20,11 @@ from datetime import datetime from enum import Enum -from pydantic import AwareDatetime, Field, NonNegativeInt, computed_field, model_validator +import pendulum +from pydantic import AwareDatetime, Field, NonNegativeInt, model_validator from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel from airflow.models import DagRun -from airflow.utils import timezone from airflow.utils.state import DagRunState from airflow.utils.types import DagRunTriggeredByType, DagRunType @@ -82,9 +82,9 @@ class TriggerDAGRunPostBody(StrictBaseModel): """Trigger DAG Run Serializer for POST body.""" dag_run_id: str | None = None + logical_date: AwareDatetime | None data_interval_start: AwareDatetime | None = None data_interval_end: AwareDatetime | None = None - conf: dict = Field(default_factory=dict) note: str | None = None @@ -96,18 +96,16 @@ def check_data_intervals(cls, values): ) return values + ## when logical date is null, the run id should be generated from run_after + random string. + # TODO: AIP83: we need to modify this validator after https://github.com/apache/airflow/pull/46398 is merged @model_validator(mode="after") def validate_dag_run_id(self): if not self.dag_run_id: - self.dag_run_id = DagRun.generate_run_id(DagRunType.MANUAL, self.logical_date) + self.dag_run_id = DagRun.generate_run_id( + DagRunType.MANUAL, self.logical_date or pendulum.now("UTC") + ) return self - # Mypy issue https://github.com/python/mypy/issues/1362 - @computed_field # type: ignore[misc] - @property - def logical_date(self) -> datetime: - return timezone.utcnow() - class DAGRunsBatchBody(StrictBaseModel): """List DAG Runs body for batch endpoint.""" diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 486c371682fdb..6d2223e6c25f2 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -10501,6 +10501,12 @@ components: - type: string - type: 'null' title: Dag Run Id + logical_date: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Logical Date data_interval_start: anyOf: - type: string @@ -10523,6 +10529,8 @@ components: title: Note additionalProperties: false type: object + required: + - logical_date title: TriggerDAGRunPostBody description: Trigger DAG Run Serializer for POST body. TriggerResponse: diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 95df203a44ac5..066d7bc06f344 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -66,6 +66,7 @@ from airflow.models import DAG, DagModel, DagRun from airflow.models.dag_version import DagVersion from airflow.timetables.base import DataInterval +from airflow.utils import timezone from airflow.utils.state import DagRunState from airflow.utils.types import DagRunTriggeredByType, DagRunType @@ -345,6 +346,7 @@ def trigger_dag_run( ) -> DAGRunResponse: """Trigger a DAG.""" dm = session.scalar(select(DagModel).where(DagModel.is_active, DagModel.dag_id == dag_id).limit(1)) + now = pendulum.now("UTC") if not dm: raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG with dag_id: '{dag_id}' not found") @@ -354,7 +356,8 @@ def trigger_dag_run( f"DAG with dag_id: '{dag_id}' has import errors and cannot be triggered", ) - logical_date = pendulum.instance(body.logical_date) + logical_date = timezone.coerce_datetime(body.logical_date) + coerced_logical_date = timezone.coerce_datetime(logical_date) try: dag: DAG = request.app.state.dag_bag.get_dag(dag_id) @@ -365,20 +368,11 @@ def trigger_dag_run( end=pendulum.instance(body.data_interval_end), ) else: - data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date) - - if body.dag_run_id: - run_id = body.dag_run_id - else: - run_id = dag.timetable.generate_run_id( - run_type=DagRunType.MANUAL, - logical_date=logical_date, - data_interval=data_interval, - ) + data_interval = dag.timetable.infer_manual_data_interval(run_after=coerced_logical_date or now) dag_run = dag.create_dagrun( - run_id=run_id, - logical_date=logical_date, + run_id=cast(str, body.dag_run_id), + logical_date=coerced_logical_date, data_interval=data_interval, run_after=data_interval.end, conf=body.conf, diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 1d688c3442d27..3e69736e87d58 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -5816,6 +5816,18 @@ export const $TriggerDAGRunPostBody = { ], title: "Dag Run Id", }, + logical_date: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Logical Date", + }, data_interval_start: { anyOf: [ { @@ -5858,6 +5870,7 @@ export const $TriggerDAGRunPostBody = { }, additionalProperties: false, type: "object", + required: ["logical_date"], title: "TriggerDAGRunPostBody", description: "Trigger DAG Run Serializer for POST body.", } as const; diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index a63ec17ffadcf..9a2ed62122ada 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1416,6 +1416,7 @@ export type TimeDelta = { */ export type TriggerDAGRunPostBody = { dag_run_id?: string | null; + logical_date: string | null; data_interval_start?: string | null; data_interval_end?: string | null; conf?: { diff --git a/airflow/ui/src/queries/useTrigger.ts b/airflow/ui/src/queries/useTrigger.ts index 2c56bda669539..3b55be037c69f 100644 --- a/airflow/ui/src/queries/useTrigger.ts +++ b/airflow/ui/src/queries/useTrigger.ts @@ -108,6 +108,7 @@ export const useTrigger = ({ dagId, onSuccessConfirm }: { dagId: string; onSucce dag_run_id: checkDagRunId, data_interval_end: formattedDataIntervalEnd, data_interval_start: formattedDataIntervalStart, + logical_date: null, note: checkNote, }, }); diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index b24fd99341564..3723232a7cd31 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -1139,12 +1139,7 @@ def _dags_for_trigger_tests(self, session=None): "dag_run_id, note, data_interval_start, data_interval_end", [ ("dag_run_5", "test-note", None, None), - ( - "dag_run_6", - "test-note", - "2024-01-03T00:00:00+00:00", - "2024-01-04T05:00:00+00:00", - ), + ("dag_run_6", "test-note", "2024-01-03T00:00:00+00:00", "2024-01-04T05:00:00+00:00"), (None, None, None, None), ], ) @@ -1153,7 +1148,7 @@ def test_should_respond_200( ): fixed_now = timezone.utcnow().isoformat() - request_json = {"note": note} + request_json = {"note": note, "logical_date": fixed_now} if dag_run_id is not None: request_json["dag_run_id"] = dag_run_id if data_interval_start is not None: @@ -1297,29 +1292,34 @@ def test_should_respond_200( ], ) def test_invalid_data(self, test_client, post_body, expected_detail): + now = timezone.utcnow().isoformat() + post_body["logical_date"] = now response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns", json=post_body) assert response.status_code == 422 assert response.json() == expected_detail @mock.patch("airflow.models.DAG.create_dagrun") def test_dagrun_creation_exception_is_handled(self, mock_create_dagrun, test_client): + now = timezone.utcnow().isoformat() error_message = "Encountered Error" mock_create_dagrun.side_effect = ValueError(error_message) - response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns", json={}) + response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns", json={"logical_date": now}) assert response.status_code == 400 assert response.json() == {"detail": error_message} def test_should_respond_404_if_a_dag_is_inactive(self, test_client, session): + now = timezone.utcnow().isoformat() self._dags_for_trigger_tests(session) - response = test_client.post("/public/dags/inactive/dagRuns", json={}) + response = test_client.post("/public/dags/inactive/dagRuns", json={"logical_date": now}) assert response.status_code == 404 assert response.json()["detail"] == "DAG with dag_id: 'inactive' not found" def test_should_respond_400_if_a_dag_has_import_errors(self, test_client, session): + now = timezone.utcnow().isoformat() self._dags_for_trigger_tests(session) - response = test_client.post("/public/dags/import_errors/dagRuns", json={}) + response = test_client.post("/public/dags/import_errors/dagRuns", json={"logical_date": now}) assert response.status_code == 400 assert ( response.json()["detail"] @@ -1334,11 +1334,11 @@ def test_should_response_409_for_duplicate_logical_date(self, test_client): note = "duplicate logical date test" response_1 = test_client.post( f"/public/dags/{DAG1_ID}/dagRuns", - json={"dag_run_id": RUN_ID_1, "note": note}, + json={"dag_run_id": RUN_ID_1, "note": note, "logical_date": now}, ) response_2 = test_client.post( f"/public/dags/{DAG1_ID}/dagRuns", - json={"dag_run_id": RUN_ID_2, "note": note}, + json={"dag_run_id": RUN_ID_2, "note": note, "logical_date": now}, ) assert response_1.status_code == 200 @@ -1378,9 +1378,14 @@ def test_should_response_409_for_duplicate_logical_date(self, test_client): def test_should_response_422_for_missing_start_date_or_end_date( self, test_client, data_interval_start, data_interval_end ): + now = timezone.utcnow().isoformat() response = test_client.post( f"/public/dags/{DAG1_ID}/dagRuns", - json={"data_interval_start": data_interval_start, "data_interval_end": data_interval_end}, + json={ + "data_interval_start": data_interval_start, + "data_interval_end": data_interval_end, + "logical_date": now, + }, ) assert response.status_code == 422 assert ( @@ -1389,21 +1394,50 @@ def test_should_response_422_for_missing_start_date_or_end_date( ) def test_raises_validation_error_for_invalid_params(self, test_client): + now = timezone.utcnow().isoformat() response = test_client.post( f"/public/dags/{DAG2_ID}/dagRuns", - json={"conf": {"validated_number": 5000}}, + json={"conf": {"validated_number": 5000}, "logical_date": now}, ) assert response.status_code == 400 assert "Invalid input for param validated_number" in response.json()["detail"] def test_response_404(self, test_client): - response = test_client.post("/public/dags/randoms/dagRuns", json={}) + now = timezone.utcnow().isoformat() + response = test_client.post("/public/dags/randoms/dagRuns", json={"logical_date": now}) assert response.status_code == 404 assert response.json()["detail"] == "DAG with dag_id: 'randoms' not found" def test_response_409(self, test_client): - response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns", json={"dag_run_id": DAG1_RUN1_ID}) + now = timezone.utcnow().isoformat() + response = test_client.post( + f"/public/dags/{DAG1_ID}/dagRuns", json={"dag_run_id": DAG1_RUN1_ID, "logical_date": now} + ) assert response.status_code == 409 response_json = response.json() assert "detail" in response_json assert list(response_json["detail"].keys()) == ["reason", "statement", "orig_error"] + + def test_should_respond_200_with_null_logical_date(self, test_client): + response = test_client.post( + f"/public/dags/{DAG1_ID}/dagRuns", + json={"logical_date": None}, + ) + assert response.status_code == 200 + assert response.json() == { + "dag_run_id": mock.ANY, + "dag_id": DAG1_ID, + "logical_date": None, + "queued_at": mock.ANY, + "start_date": None, + "end_date": None, + "data_interval_start": mock.ANY, + "data_interval_end": mock.ANY, + "last_scheduling_decision": None, + "run_type": "manual", + "state": "queued", + "external_trigger": True, + "triggered_by": "rest_api", + "conf": {}, + "note": None, + } diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py index b7c2f2282c8be..f4324abf4850a 100644 --- a/tests/models/test_dagrun.py +++ b/tests/models/test_dagrun.py @@ -356,10 +356,11 @@ def test_dagrun_no_deadlock_with_depends_on_past(self, dag_maker, session): run_type=DagRunType.SCHEDULED, start_date=DEFAULT_DATE, ) - dr2 = dag_maker.create_dagrun_after( - dr, + next_date = DEFAULT_DATE + datetime.timedelta(days=1) + dr2 = dag_maker.create_dagrun( run_id="test_dagrun_no_deadlock_2", start_date=DEFAULT_DATE + datetime.timedelta(days=1), + logical_date=next_date, ) ti1_op1 = dr.get_task_instance(task_id="dop") dr2.get_task_instance(task_id="dop")