Skip to content

AIP-103: Add Execution API endpoints for task and asset states#66073

Merged
amoghrajesh merged 8 commits intoapache:mainfrom
astronomer:aip-103-2-exec-api-and-sdk-comms
May 4, 2026
Merged

AIP-103: Add Execution API endpoints for task and asset states#66073
amoghrajesh merged 8 commits intoapache:mainfrom
astronomer:aip-103-2-exec-api-and-sdk-comms

Conversation

@amoghrajesh
Copy link
Copy Markdown
Contributor

@amoghrajesh amoghrajesh commented Apr 29, 2026


Was generative AI tooling used to co-author this PR?
  • Yes - claude sonnet 4.6

Only the last commit is relevant, this has been built on top of #65759

closes: #66069

This PR is part of AIP-103 (Task State Management) and adds the execution API endpoints for task and asset state.

Summary

  • New endpoints under /execution/state/:
    • GET/PUT/DELETE /state/ti/{task_instance_id}/{key} and
      DELETE /state/ti/{task_instance_id} to clear all keys for a task
    • GET/PUT/DELETE /state/asset/{name}/{key} and
      DELETE /state/asset/{name} to clear all keys for an asset
  • Routes call the configured [state_store] backend (defaults to
    MetastoreStateBackend from PR 1). Custom worker-side backends
    bypass the API entirely (although I think I should hardcode
    MetastoreStateBackend in API server since we do not want state
    backend credentials there?)
  • Task state is JWT-scoped via Security(require_auth, scopes=["ti:self"])
    on the router — a task can only access its own state.
  • Single backend instance shared across both routers via a cached
    get_state_backend() in airflow.state.

A few design choices worth flagging

  • API first model for custom state backends or not?

Two shapes:

A: API resolves backend, ie: worker → API → could be DB, S3, Redis, or even metastore.

B: API = always DB, ie: worker → backend and API → metastore only. Custom backends live worker-side only and bypass the API — anything stored on a custom state backend goes via the worker itself; if not configured, it falls through to metastore via the API.

Why Shape B?

  1. XCom does this. One axis instead of two. Default is through API, custom backend is direct.
  2. API stays minimal. No S3/Redis deps on the API server.
  3. Lower latency. Custom backend = direct write, no API trip.

But I would love to hear thoughts on A. If not, I can hardcode MetastoreStateBackend in the API endpoints.

  • Asset state routes use name (not integer asset_id) in the URL. Asset names are unique, directly available on the task's Asset object at runtime (no extra lookup), and consistent with how /assets/by-name already works. The integer asset_id is an internal DB detail that shouldn't leak into the API surface.

  • Why we don't pass the route's session into the backend. First attempt threaded session=session from SessionDep into backend.set(...). mypy rightly complained — BaseStateBackend.set doesn't declare a session kwarg, only MetastoreStateBackend.set does (via @provide_session). I went back and forth on adding *, session: Any = None to the abstract methods, but it leaks a SQLAlchemy concept into an interface that S3/Redis/GCS backends have no use for.

    So: backend manages its own session via @provide_sessioncreate_session(). Routes keep SessionDep for non-backend lookups (TaskInstance / AssetModel existence) — those are reads, no commit needed. Cost is one extra session per state op, negligible. Worth it to keep the abstraction clean.

  • No Cadwyn migration. These are net-new endpoints. Old clients that don't call them are unaffected.

  • Mapped task semantics. PUT, GET, single-key DELETE, and default DELETE-all all stay scoped to one mapped instance — task_instance_id resolves to a unique (dag_id, run_id, task_id, map_index). Fleet-wide wipe (across every map_index) is opt-in via ?all_map_indices=true on DELETE-all. Destructive bulk is never the default; SDK passes the flag only when the caller asks for it.

What's deferred / todo so far

  • Per-task asset registration check. AIP-103 defined that the JWT security model "should be updated to only allow tasks to modify those Assets they are already registered with". Same gap exists in /assets and /asset-events today — the proper fix is a unified check across all asset routes, which I will probably try before merge here. TODO in asset_state.py.

Testing Manually

Setup

  1. Breeze running: breeze start-airflow --backend postgres --load-example-dags
  2. A DAG triggered so at least one TaskInstance exists
  3. Use this script and run in breeze container and use that token:
def main() -> None:
    from airflow.configuration import conf

    secret = conf.get("api_auth", "jwt_secret", fallback=None)
    pkey_path = conf.get("api_auth", "jwt_private_key_path", fallback=None)
    print(
        f"[diag] jwt_secret first 8 chars: {(secret or '')[:8]!r} (empty means using private key file or auto-gen)"
    )
    print(f"[diag] jwt_private_key_path: {pkey_path!r}")
    print(f"[diag] jwt_audience: {conf.get('execution_api', 'jwt_audience', fallback=None)!r}")

    with create_session() as session:
        ti = session.scalar(select(TaskInstance).limit(1))
        if ti is None:
            print(
                "No task instances found. Trigger a DAG first (e.g. `airflow dags trigger example_bash_operator`)."
            )
            return
        gen = _jwt_generator()
        token = gen.generate(extras={"sub": str(ti.id), "scope": "execution"})
        print(f"ti_id:  {ti.id}")
        print(f"dag_id: {ti.dag_id}  run_id: {ti.run_id}  task_id: {ti.task_id}  map_index: {ti.map_index}")
        print(f"token:  {token}")


if __name__ == "__main__":
    main()

Example:

[Breeze:3.10.20] root@f43294940632:/opt/airflow$ python dev/gen_ti_token.py
2026-04-29T08:26:29.764581Z [info     ] setup plugin alembic.autogenerate.schemas [alembic.runtime.plugins] loc=plugins.py:37
2026-04-29T08:26:29.765573Z [info     ] setup plugin alembic.autogenerate.tables [alembic.runtime.plugins] loc=plugins.py:37
2026-04-29T08:26:29.766389Z [info     ] setup plugin alembic.autogenerate.types [alembic.runtime.plugins] loc=plugins.py:37
2026-04-29T08:26:29.766492Z [info     ] setup plugin alembic.autogenerate.constraints [alembic.runtime.plugins] loc=plugins.py:37
2026-04-29T08:26:29.766558Z [info     ] setup plugin alembic.autogenerate.defaults [alembic.runtime.plugins] loc=plugins.py:37
2026-04-29T08:26:29.766622Z [info     ] setup plugin alembic.autogenerate.comments [alembic.runtime.plugins] loc=plugins.py:37
[diag] jwt_secret first 8 chars: 'tIyIg3gY' (empty means using private key file or auto-gen)
[diag] jwt_private_key_path: None
[diag] jwt_audience: 'urn:airflow.apache.org:task'
2026-04-29T08:26:30.272426Z [warning  ] The HMAC key is 24 bytes long, which is below the minimum recommended length of 64 bytes for SHA512. See RFC 7518 Section 3.2. [py.warnings] category=InsecureKeyLengthWarning filename=/usr/python/lib/python3.10/site-packages/jwt/api_jwt.py lineno=147
ti_id:  019dd858-888f-7160-b16f-033390d4386a
dag_id: my_dag  run_id: manual__2026-04-29T08:26:14.011123+00:00  task_id: t1  map_index: -1
token:  eyJhbGciOiJIUzUxMiIsImtpZCI6Im5vdC11c2VkIiwidHlwIjoiSldUIn0.eyJzdWIiOiIwMTlkZDg1OC04ODhmLTcxNjAtYjE2Zi0wMzMzOTBkNDM4NmEiLCJzY29wZSI6ImV4ZWN1dGlvbiIsImp0aSI6Ijc2NmM2MDgyZDEzZTQ1MzU4Y2U5YjNkMmZmMjZhOWRiIiwiaXNzIjoiYWlyZmxvdyIsImF1ZCI6InVybjphaXJmbG93LmFwYWNoZS5vcmc6dGFzayIsIm5iZiI6MTc3NzQ1MTE5MCwiZXhwIjoxNzc3NDUxNzkwLCJpYXQiOjE3Nzc0NTExOTB9.Y-nAYstD5zcBiX7JcXqCml1-sIFlmjqjkAYSNbOOQNaRmdMaYPmxcTIAX1fKr0a_fgRyla3sGOMhjdACBD_sEQ
  1. I created a postman collections with AIP 103 related API endpoints, but I will showcase running them below.

Testing

Regular Tasks

  1. Put on task_state
image

Validation:

image
  1. Get task state
image
  1. Delete task state
image

Validation:
image

  1. Write and then task state for same task
image image image
  1. Demonstrating clear_all
  • First adding one more key
image image
  • Sending API call
image

All gone:
image

Mapped Tasks

  • Everything related to PUT, GET, DELETE remains same because ti_id is unique for mapped tasks and it will populate those values right but DELETE-all is intentionally different. A worker calling DELETE /state/ti/{ti_id} wipes state across every map_index of the task, not just its own. The reasoning: clear-all is a "task is done with its state" operation, not a per-instance reset. Per-instance reset is already covered by DELETE. Showcasing that below.

Using this DAG:

from airflow import DAG
from airflow.sdk import task
from datetime import datetime


with DAG(
    dag_id="example_mapped_tasks",
    start_date=datetime(2024, 1, 1),
    schedule=None,
    catchup=False,
) as dag:

    @task
    def generate_numbers():
        return [1, 2, 3, 4, 5]

    @task
    def square(x: int):
        return x * x

    numbers = generate_numbers()

    squared = square.expand(x=numbers)

Ran it and this is TI:
image

Pushed in task_state for map_index: 0,1,4

image
  1. Now trying DELETE ALL with ti_id: 00000000-0000-0000-0000-000000000001 (Negative case)
image
  1. Trying to run DELETE ALL API but with TI of task_instance as 4
image

(It just deleted it for that task_state with map_index as 4)

  1. Trying to run DELETE ALL API but with TI of task_instance as -1 (019ddd86-34b6-7eb5-9dcb-96803d1c49f5)
image
  1. Passing in the destructive flag: all_map_indices to see if it deletes all
image

All have been deleted

image

Assets being tested

  • Used this DAG and triggered once and created JWT token as earlier:
from __future__ import annotations

import pendulum

from airflow.sdk import DAG, Asset, task

aip103_test_asset = Asset(
    name="aip103_test_asset",
    uri="s3://aip103-test/watermarks/orders",
    group="asset",
)


with DAG(
    dag_id="aip103_asset_producer",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    schedule=None,
    catchup=False,
    tags=["aip-103", "asset-state-test"],
):

    @task(outlets=[aip103_test_asset])
    def produce():
        print(f"Producer running for {aip103_test_asset.uri!r}")

    produce()


with DAG(
    dag_id="aip103_asset_consumer",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    schedule=None,
    catchup=False,
    tags=["aip-103", "asset-state-test"],
):

    @task(inlets=[aip103_test_asset])
    def consume():
        print(f"Consumer running for {aip103_test_asset.uri!r}")

    consume()

5a. Put asset state:

image image

5b. Get asset state:

image

5c. Delete asset state

image image

5d. Delete all asset state

For that adding multiple keys, for same asset

image

Running delete all request

image

All gone:

image
  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

Comment thread airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state.py Outdated
Copy link
Copy Markdown
Contributor

@jscheffl jscheffl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good in general.

@amoghrajesh amoghrajesh force-pushed the aip-103-2-exec-api-and-sdk-comms branch from 9912b2d to c6a1a3b Compare April 30, 2026 08:14
@amoghrajesh amoghrajesh marked this pull request as ready for review April 30, 2026 08:46
Comment thread airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state.py Outdated
Comment thread airflow-core/src/airflow/api_fastapi/execution_api/routes/task_state.py Outdated
Comment thread airflow-core/src/airflow/state/__init__.py Outdated
Comment thread airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state.py Outdated
@amoghrajesh amoghrajesh requested review from jscheffl and kaxil May 1, 2026 13:44
Comment thread airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state.py Outdated
Comment thread airflow-core/src/airflow/api_fastapi/execution_api/routes/task_state.py Outdated
@amoghrajesh amoghrajesh requested a review from kaxil May 2, 2026 14:36
@amoghrajesh amoghrajesh added the full tests needed We need to run full set of tests for this PR to merge label May 3, 2026
@amoghrajesh
Copy link
Copy Markdown
Contributor Author

Rerunning with full tests

@amoghrajesh amoghrajesh closed this May 3, 2026
@github-project-automation github-project-automation Bot moved this from In progress to Done in AIP-103: Task State Management May 3, 2026
@amoghrajesh amoghrajesh reopened this May 3, 2026
@amoghrajesh
Copy link
Copy Markdown
Contributor Author

Merging this PR in right now to unblock further PRs, I have handled all the review comments on the PR, thanks for reviews @kaxil @jscheffl!

@amoghrajesh amoghrajesh merged commit ffb1b8a into apache:main May 4, 2026
279 checks passed
@amoghrajesh amoghrajesh deleted the aip-103-2-exec-api-and-sdk-comms branch May 4, 2026 04:35
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 4, 2026

Backport failed to create: v3-2-test. View the failure log Run details

Note: As of Merging PRs targeted for Airflow 3.X
the committer who merges the PR is responsible for backporting the PRs that are bug fixes (generally speaking) to the maintenance branches.

In matter of doubt please ask in #release-management Slack channel.

Status Branch Result
v3-2-test Commit Link

You can attempt to backport this manually by running:

cherry_picker ffb1b8a v3-2-test

This should apply the commit to the v3-2-test branch and leave the commit in conflict state marking
the files that need manual conflict resolution.

After you have resolved the conflicts, you can continue the backport process by running:

cherry_picker --continue

If you don't have cherry-picker installed, see the installation guide.

@amoghrajesh
Copy link
Copy Markdown
Contributor Author

No need to rebase

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:API Airflow's REST/HTTP API area:ConfigTemplates area:db-migrations PRs with DB migration area:dev-tools area:task-sdk full tests needed We need to run full set of tests for this PR to merge kind:documentation

Projects

Development

Successfully merging this pull request may close these issues.

Task and Asset Execution APIs

3 participants