Skip to content

AIP-103: Wiring up task SDK comms and context accessors#66160

Merged
amoghrajesh merged 16 commits intoapache:mainfrom
astronomer:aip-103-3-task-sdk-comms
May 7, 2026
Merged

AIP-103: Wiring up task SDK comms and context accessors#66160
amoghrajesh merged 16 commits intoapache:mainfrom
astronomer:aip-103-3-task-sdk-comms

Conversation

@amoghrajesh
Copy link
Copy Markdown
Contributor

@amoghrajesh amoghrajesh commented Apr 30, 2026

closes: #65779

This PR is part of AIP-103 (Task State Management) and is the third in the series. It adds the Task SDK layer: comms message types, supervisor proxy, and context accessors that wire context['task_state'] and context['asset_state'] into task execution.

What does the PR have?

Comms layer (comms.py):

  • GetTaskState, SetTaskState, DeleteTaskState, ClearTaskState (with all_map_indices: bool field)
  • GetAssetStateByName/GetAssetStateByUri, SetAssetStateByName/SetAssetStateByUri, DeleteAssetStateByName/DeleteAssetStateByUri, ClearAssetStateByName/ClearAssetStateByUri — separate typed classes per addressing mode, matching the GetAssetByName/GetAssetByUri convention
  • GetAssetsByAlias + AssetsByAliasResult — resolves an AssetAlias inlet to its concrete assets at context build time
  • TaskStateResult, AssetStateResult result types

Supervisor (supervisor.py): handler branches proxying the above messages to the Execution API endpoints from PR #66073.

Client (client.py): TaskStateOperations and AssetStateOperations classes exposed as client.task_state and client.asset_state. AssetStateOperations has a _resolve_endpoint helper that builds the by-name/{op} or by-uri/{op} endpoint + params, keeping get/set/delete/clear each a one-liner. AssetOperations.get_by_alias() resolves alias → concrete assets.

Execution API (routes/assets.py): GET /assets/by-alias?alias_name=... wrapping the existing expand_alias_to_assets() DB function. Cadwyn migration added in v2026_04_17.py.

Context accessors (context.py):

  • TaskStateAccessor — always available as context['task_state']
  • AssetStateAccessors container + AssetStateAccessor (per-asset):
    • context['asset_state'][MY_ASSET].get('watermark') — keyed by Asset | AssetNameRef | AssetUriRef | AssetAlias, consistent with inlet_events[asset]
    • Single-inlet sugar: context['asset_state'].get('watermark') proxies through when exactly one concrete inlet exists; raises ValueError with a clear message for multi-inlet tasks
    • AssetAlias inlets are resolved to their concrete assets at context build time via GetAssetsByAlias comms — context['asset_state'][Asset(name="a")] works when the alias maps to that asset. If the alias resolves to nothing, asset_state is present but empty.

Context wiring (task_runner.py): both accessors registered in get_template_context(). asset_state is set for any task with at least one concrete inlet including aliases.

Context TypedDict (definitions/context.py): task_state: TaskStateAccessor and asset_state: AssetStateAccessors added.

Design choices worth flagging

  1. Asset state routes use name/uri not asset_id. Asset names and URIs are unique, directly on the Asset object at runtime, consistent with /assets/by-name and /assets/by-uri. Avoids a DB round trip.

  2. clear() wiping entire fleet is opt-in. DELETE /state/ti/{ti_id} defaults to clearing only this task instance's map_index. Pass ?all_map_indices=true (or task_state.clear(all_map_indices=True)) for fleet-wide wipe.

  3. Both keyed and sugar access on asset_state. context['asset_state'][MY_ASSET] is the primary API. For single-inlet tasks (the watcher pattern), context['asset_state'].get(...) works as sugar. Consistent with inlet_events[asset].

  4. AssetAlias resolution is event-driven. The alias -> concrete asset mapping in expand_alias_to_assets() is populated when a producer emits through the alias. If the alias has never been emitted through, asset_state is present but empty.

  5. __getattr__ not implemented. Template access like {{ task_state.job_id }} is not supported yet. Easy to add later.

Test plan

  • Unit: TestTaskStateAccessor, TestAssetStateAccessor, TestAssetStateAccessors in test_context.py
  • Supervisor: TestHandleRequest in test_supervisor.py — includes GetAssetsByAlias case
  • Client: TestTaskStateOperations, TestAssetStateOperations, get_by_alias cases in test_client.py
  • Integration: TestTaskInstanceStateOperations in test_task_runner.py — covers multi-inlet keyed access, AssetUriRef inlet, AssetAlias inlet resolving to concrete asset, and empty alias

Manual verification for the new asset alias endpoint

DAG:

from airflow.sdk import DAG, task
from airflow.sdk.definitions.asset import Asset, AssetAlias
import pendulum

my_asset = Asset(name="test_asset", uri="s3://bucket/test")
my_alias = AssetAlias("test_alias")

with DAG("alias_producer", schedule=None, start_date=pendulum.datetime(2026, 1, 1)):

    @task(outlets=[my_alias, my_asset])
    def produce(**context):
        context["outlet_events"][my_alias].add(my_asset, extra={"run": "1"})
    produce()


with DAG("alias_consumer", schedule=None, start_date=pendulum.datetime(2026, 1, 1)):

    @task(inlets=[my_alias])
    def consume(**context):
        print("=== inlet_events ===")
        try:
            events = list(context["inlet_events"][my_alias])
            print(f"events: {events}")
        except Exception as e:
            print(f"inlet_events error: {e}")

        print("=== asset_state ===")
        print("asset_state:", context.get("asset_state"))

        try:
            state = context["asset_state"][my_asset]
            print("current watermark:", state.get("watermark"))
            state.set("watermark", "2026-05-06")
            print("set watermark to 2026-05-06")
            print("read back:", state.get("watermark"))
        except Exception as e:
            print(f"asset_state error: {e}")

    consume()

When I do this, this is what I get from the consumer task:

image

ie: producer emits through alias, consumer accesses context['asset_state'][my_asset], set/get watermark works


Was generative AI tooling used to co-author this PR?
  • Yes: claude sonnet 4.6

  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

@boring-cyborg boring-cyborg Bot added area:API Airflow's REST/HTTP API area:task-sdk labels Apr 30, 2026
Comment thread task-sdk/src/airflow/sdk/execution_time/task_runner.py Outdated
Comment thread task-sdk/src/airflow/sdk/execution_time/context.py Outdated
Comment thread task-sdk/src/airflow/sdk/execution_time/context.py
Comment thread task-sdk/src/airflow/sdk/execution_time/context.py Outdated
Comment thread task-sdk/src/airflow/sdk/api/client.py Outdated
Comment thread task-sdk/src/airflow/sdk/definitions/context.py
@amoghrajesh amoghrajesh force-pushed the aip-103-3-task-sdk-comms branch from f022c48 to cb4b54a Compare May 4, 2026 05:27
@amoghrajesh amoghrajesh marked this pull request as ready for review May 4, 2026 05:28
Comment thread task-sdk/src/airflow/sdk/api/client.py
Comment thread task-sdk/src/airflow/sdk/execution_time/context.py Outdated
Comment thread task-sdk/src/airflow/sdk/execution_time/context.py
Comment thread task-sdk/src/airflow/sdk/execution_time/context.py
Comment thread task-sdk/src/airflow/sdk/execution_time/context.py Outdated
Comment thread task-sdk/src/airflow/sdk/execution_time/context.py Outdated
Comment thread task-sdk/src/airflow/sdk/execution_time/context.py
Comment thread task-sdk/src/airflow/sdk/execution_time/context.py Outdated
Comment thread task-sdk/src/airflow/sdk/execution_time/context.py Outdated
Comment thread task-sdk/src/airflow/sdk/execution_time/context.py Outdated
@amoghrajesh amoghrajesh requested a review from Lee-W May 5, 2026 08:07
@amoghrajesh amoghrajesh requested a review from kaxil May 5, 2026 09:31
@amoghrajesh
Copy link
Copy Markdown
Contributor Author

@uranusjr + @Lee-W (not now, in your morning), it will be nice to get your reviews on this one again :)

Comment thread airflow-core/tests/unit/dag_processing/test_processor.py Outdated
Comment thread scripts/ci/prek/check_template_context_variable_in_sync.py
Comment thread task-sdk/src/airflow/sdk/execution_time/context.py Outdated
Comment thread task-sdk/src/airflow/sdk/execution_time/task_runner.py Outdated
Comment thread task-sdk/src/airflow/sdk/execution_time/task_runner.py Outdated
Lee-W
Lee-W previously approved these changes May 6, 2026
Copy link
Copy Markdown
Member

@Lee-W Lee-W left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One last question, if it works, then I'm good with this PR. if not, please don't merge it yet

Thanks for the persistence :)

@amoghrajesh
Copy link
Copy Markdown
Contributor Author

@Lee-W I tested it and confirmed asset_state was absent from context entirely when the only inlet is AssetAlias as I had made note in the PR desc and code comments which i was going to handle later but now that you mentioned it, I am fixing it now. AssetAlias inlets are resolved to their concrete assets at context build time via a new GetAssetsByAlias comms message, so context["asset_state"][Asset(name="a")] works when the alias resolves to that asset. For an alias that maps to nothing, asset_state is still in context but empty. Returning 200 [] rather than 404 for unknown aliases since it is a collection rest API endpoint.

@Lee-W Lee-W dismissed their stale review May 6, 2026 12:59

thanks for checking. revoking the approval for avoding accidental merging

@amoghrajesh
Copy link
Copy Markdown
Contributor Author

Added GET /assets/by-alias execution API endpoint so that AssetAlias inlets can be resolved to their concrete assets at context build time. Without this, the worker only knows the alias name and it doesn't know which concrete assets the alias maps to, so context["asset_state"][my_asset] had no way to work. The alias -> asset mapping is event-driven (populated when a producer emits through the alias), so asset_state is present but empty if the alias hasn't been used yet. Handled in: ability to fetch asset_state for aliases too

DAG:

from airflow.sdk import DAG, task
from airflow.sdk.definitions.asset import Asset, AssetAlias
import pendulum

my_asset = Asset(name="test_asset", uri="s3://bucket/test")
my_alias = AssetAlias("test_alias")

with DAG("alias_producer", schedule=None, start_date=pendulum.datetime(2026, 1, 1)):

    @task(outlets=[my_alias, my_asset])
    def produce(**context):
        context["outlet_events"][my_alias].add(my_asset, extra={"run": "1"})
    produce()


with DAG("alias_consumer", schedule=None, start_date=pendulum.datetime(2026, 1, 1)):

    @task(inlets=[my_alias])
    def consume(**context):
        print("=== inlet_events ===")
        try:
            events = list(context["inlet_events"][my_alias])
            print(f"events: {events}")
        except Exception as e:
            print(f"inlet_events error: {e}")

        print("=== asset_state ===")
        print("asset_state:", context.get("asset_state"))

        try:
            state = context["asset_state"][my_asset]
            print("current watermark:", state.get("watermark"))
            state.set("watermark", "2026-05-06")
            print("set watermark to 2026-05-06")
            print("read back:", state.get("watermark"))
        except Exception as e:
            print(f"asset_state error: {e}")

    consume()

When I do this, this is what I get from the consumer task:

image

ie: producer emits through alias, consumer accesses context['asset_state'][my_asset], set/get watermark works

@amoghrajesh
Copy link
Copy Markdown
Contributor Author

(Hopefully the PR isn't too huge off of scope right now)

Copy link
Copy Markdown
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nits that can be addressed in follow-up, Non-blocking. Thanks.

Comment thread task-sdk/src/airflow/sdk/execution_time/context.py
Comment thread task-sdk/src/airflow/sdk/execution_time/supervisor.py
Comment thread task-sdk/src/airflow/sdk/execution_time/context.py
Comment thread task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@amoghrajesh amoghrajesh added the full tests needed We need to run full set of tests for this PR to merge label May 7, 2026
@amoghrajesh
Copy link
Copy Markdown
Contributor Author

Ah, I wanna run this through full suite

@amoghrajesh amoghrajesh closed this May 7, 2026
@github-project-automation github-project-automation Bot moved this from In progress to Done in AIP-103: Task State Management May 7, 2026
@amoghrajesh amoghrajesh reopened this May 7, 2026
@amoghrajesh
Copy link
Copy Markdown
Contributor Author

Thanks for the review folks, merging this one. Will follow up with the next items

@amoghrajesh amoghrajesh merged commit 6d51744 into apache:main May 7, 2026
270 of 281 checks passed
@amoghrajesh amoghrajesh deleted the aip-103-3-task-sdk-comms branch May 7, 2026 10:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:API Airflow's REST/HTTP API area:task-sdk full tests needed We need to run full set of tests for this PR to merge

Projects

Development

Successfully merging this pull request may close these issues.

Expose task state and asset state to tasks via context vars

5 participants