Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-72: Handling up_for_retry task instance states for AirflowTaskTimeout and AirflowException #44981

Closed
wants to merge 10 commits into from
8 changes: 2 additions & 6 deletions airflow/api_fastapi/execution_api/routes/task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,8 @@ def ti_update_state(

if isinstance(ti_patch_payload, TITerminalStatePayload):
query = TI.duration_expression_update(ti_patch_payload.end_date, query, session.bind)
query = query.values(
state=ti_patch_payload.state,
)
if ti_patch_payload.state == State.FAILED:
# clear the next_method and next_kwargs
query = query.values(next_method=None, next_kwargs=None)
# clear the next_method and next_kwargs for all terminal states, as we do not want retries to pick them
query = query.values(state=ti_patch_payload.state, next_method=None, next_kwargs=None)
elif isinstance(ti_patch_payload, TIDeferredStatePayload):
# Calculate timeout if it was passed
timeout = None
Expand Down
4 changes: 2 additions & 2 deletions airflow/utils/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class TerminalTIState(str, Enum):
FAILED = "failed"
SKIPPED = "skipped" # A user can raise a AirflowSkipException from a task & it will be marked as skipped
REMOVED = "removed"
UP_FOR_RETRY = "up_for_retry" # We do not need to do anything actionable for this state, hence it is a terminal state.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"terminal state" means the task is done; up for retry does not really feel like a terminal state.... since it's going to be retried....

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this highlights an ambiguity / conflict. the TI-try is done, but the TI is not


def __str__(self) -> str:
return self.value
Expand All @@ -50,7 +51,6 @@ class IntermediateTIState(str, Enum):
SCHEDULED = "scheduled"
QUEUED = "queued"
RESTARTING = "restarting"
UP_FOR_RETRY = "up_for_retry"
UP_FOR_RESCHEDULE = "up_for_reschedule"
UPSTREAM_FAILED = "upstream_failed"
DEFERRED = "deferred"
Expand Down Expand Up @@ -80,7 +80,7 @@ class TaskInstanceState(str, Enum):
SUCCESS = TerminalTIState.SUCCESS # Task completed
RESTARTING = IntermediateTIState.RESTARTING # External request to restart (e.g. cleared when running)
FAILED = TerminalTIState.FAILED # Task errored out
UP_FOR_RETRY = IntermediateTIState.UP_FOR_RETRY # Task failed but has retries left
UP_FOR_RETRY = TerminalTIState.UP_FOR_RETRY # Task failed but has retries left
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be IntermediateTIState, i.e. the task will be "retried" as opposed to success, failed, skipped etc -- where it TI is completed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in DM we discussed UPSTREAM_FAILED should be TerminalTIState not UP_FOR_RETRY

UP_FOR_RESCHEDULE = IntermediateTIState.UP_FOR_RESCHEDULE # A waiting `reschedule` sensor
UPSTREAM_FAILED = IntermediateTIState.UPSTREAM_FAILED # One or more upstream deps failed
SKIPPED = TerminalTIState.SKIPPED # Skipped by branching or some other mechanism
Expand Down
2 changes: 1 addition & 1 deletion task_sdk/src/airflow/sdk/api/datamodels/_generated.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ class IntermediateTIState(str, Enum):
SCHEDULED = "scheduled"
QUEUED = "queued"
RESTARTING = "restarting"
UP_FOR_RETRY = "up_for_retry"
UP_FOR_RESCHEDULE = "up_for_reschedule"
UPSTREAM_FAILED = "upstream_failed"
DEFERRED = "deferred"
Expand Down Expand Up @@ -119,6 +118,7 @@ class TerminalTIState(str, Enum):
FAILED = "failed"
SKIPPED = "skipped"
REMOVED = "removed"
UP_FOR_RETRY = "up_for_retry"


class ValidationError(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def teardown_method(self):
(State.SUCCESS, DEFAULT_END_DATE, State.SUCCESS),
(State.FAILED, DEFAULT_END_DATE, State.FAILED),
(State.SKIPPED, DEFAULT_END_DATE, State.SKIPPED),
(State.UP_FOR_RETRY, DEFAULT_END_DATE, State.UP_FOR_RETRY),
],
)
def test_ti_update_state_to_terminal(
Expand Down