Skip to content

Thread version_data through BundleInfo to worker-side bundle initialization#67217

Open
o-nikolas wants to merge 7 commits into
apache:mainfrom
aws-mwaa:onikolas/pr2/s3-bundle-version-worker-flow
Open

Thread version_data through BundleInfo to worker-side bundle initialization#67217
o-nikolas wants to merge 7 commits into
apache:mainfrom
aws-mwaa:onikolas/pr2/s3-bundle-version-worker-flow

Conversation

@o-nikolas
Copy link
Copy Markdown
Contributor

@o-nikolas o-nikolas commented May 19, 2026

This is PR 2 of the S3 Dag Bundle versioning series. PR 1 (#66491) added the BundleVersion dataclass, Alembic migration, and persistence path. This PR completes the worker-side plumbing so that version data reaches the bundle instance at task execution time.

Adds version_data to BundleInfo and threads it through the worker-side bundle initialization path so that structured version metadata (e.g., S3 manifests) reaches the bundle at task execution time.

Changes:

  • BundleInfo gains version_data: dict | None = None field
  • ExecuteTask.make() reads version_data from DagVersion (via eagerly-loaded relationship)
  • Scheduler query adds selectinload(TI.dag_version) to avoid N+1 queries
  • BaseDagBundle.__init__ accepts and stores version_data
  • DagBundlesManager.get_bundle() passes version_data to the bundle constructor
  • task_runner.parse() and callback_supervisor pass version_data through
  • Task SDK _generated.py updated with the new field

related: #66491


Was generative AI tooling used to co-author this PR?
  • Yes — Claude Code (Opus 4)

Generated-by: Claude Code (Opus 4) following the guidelines

Copy link
Copy Markdown
Contributor

@ferruzzi ferruzzi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want any unit tests? It's just plumbing a value through, so maybe not necessary. Feels pretty trivial on this PR so I'll approve but felt odd not calling it out.

Comment thread airflow-core/src/airflow/dag_processing/bundles/base.py Outdated
@o-nikolas o-nikolas force-pushed the onikolas/pr2/s3-bundle-version-worker-flow branch from 97dc507 to 48a9b1b Compare May 20, 2026 08:23
@o-nikolas o-nikolas force-pushed the onikolas/pr2/s3-bundle-version-worker-flow branch from e5e65a1 to a5bc3b2 Compare May 21, 2026 18:35
@o-nikolas
Copy link
Copy Markdown
Contributor Author

@ashb and @amoghrajesh You might be interested in this one.

if not bundle_info:
version_data = None
if ti.dag_version is not None:
version_data = ti.dag_version.version_data
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why of this read off ti, but other things just below are ti.dag_model.bundle*

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth flagging that even off the same ti, version and version_data can disagree. Two cases:

  1. Unpinned runs (disable_bundle_versioning=True): dag_run.bundle_version is None, but ti.dag_version.version_data may still carry a manifest -- so version=None, version_data={...}.
  2. After _verify_integrity_if_dag_changed (scheduler_job_runner.py:2521-2530): TI's dag_version_id is bumped to the latest version while dag_run.bundle_version is left untouched, so version_data describes a newer version than version reports.

The scheduler picks a deliberate rule for bundle_version at scheduler_job_runner.py:1438-1442; worth deciding the equivalent rule for version_data here (e.g., is it valid to expose version_data when version is None?).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ashb - What we're passing along here is the version that is being used and the bundle to use to grab it. The bundle itself isn't tracking the current version of the dag, that's on the dag_version. So once on the worker side, we'll ask the bundle to load the state of the bundle for the specific version the dag is trying to run with. So properties from these two things are combined later. Does that make sense?

@kaxil Fair point, I think the right rule is: version_data should only be populated when the run is pinned (i.e., dag_run.bundle_version is not None). If the run is unpinned, the worker should use the latest bundle state anyway, so sending stale version_data would be misleading. I'll add a guard:

version_data = None
if ti.dag_version is not None and ti.dag_run.bundle_version is not None:
    version_data = ti.dag_version.version_data 

This mirrors the existing rule for bundle_version at scheduler_job_runner.py:1438-1442.

For the second case the version_data will still come from the ti's dag_version (which may be newer than the run's), but since the run is pinned, the bundle will use version_data to check out the right code regardless. If we want stricter alignment (version_data always matches bundle_version), we'd need to look up the DagVersion by bundle_version string rather than using ti.dag_version but that adds another query and I think the current ti-based approach is correct for the worker's needs. Thoughts?

Comment thread airflow-core/src/airflow/executors/workloads/types.py Outdated
ranked_query.c.map_index_for_ordering,
)
.options(selectinload(TI.dag_model))
.options(selectinload(TI.dag_version))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, joins aren't free, and this isn't used for most places.

I'm wondering if this needs to be based on what the bundle backend needs somehow?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

selectinload uses a single additional SELECT batched across all tis in the result set (not n sized query), so the cost is one extra round-trip per scheduler loop iteration rather than per ti. It's hard to make it conditional because we're working with a bunch of tis that are mixed between versioned bundles and not. We could do a lazy load later, but then it becomes many requests, one for each TI lazy loaded. I think the impact here is pretty minimal given the single extra batched select (no new joins are happening here now)?

Comment on lines +962 to +966
version_data:
anyOf:
- additionalProperties: true
type: object
- type: 'null'
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the field is present it should be an object.

(I.e. its worker not sent, or its an {...} object )

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The version field above uses the same approach which this uses? But happy to change it, I'll leave version as is though? @ashb?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ended up needing a model serializer in workloads.base.BundleInfo to be able to get the field not passed over the wire if None/null. Seems maybe not worth it to me, and keeping it the same as version above seems fine. Thoughts @ashb?

Comment thread providers/celery/tests/unit/celery/executors/test_celery_executor.py Outdated
Comment thread airflow-core/src/airflow/dag_processing/bundles/base.py
o-nikolas added 7 commits May 25, 2026 17:16
…zation

Add version_data to the push path so structured bundle metadata (e.g.,
S3 manifests) reaches workers at task execution time.

Changes:
- Add version_data field to BundleInfo (workloads/base.py)
- Populate version_data from DagVersion in ExecuteTask.make()
- Add selectinload(TI.dag_version) to scheduler enqueue query to
  avoid N+1 queries when reading version_data
- Add version_data parameter to BaseDagBundle.__init__ (stored as
  self.version_data) and DagBundlesManager.get_bundle()
- Pass version_data through task_runner.py and callback_supervisor.py
- Regenerate task-sdk datamodels to include version_data in BundleInfo

Existing bundles ignore version_data (defaults to None). The S3 bundle
will use self.version_data in initialize() to fetch specific object
versions (follow-up PR).
Address review feedback:
- Use dict[str, Any] | None instead of bare dict | None for version_data
  in both BaseDagBundle.__init__ and BundleInfo
- Add minimal tests verifying version_data plumbing through the bundle
  constructor
…stanceKey types

The strict TypeError for unknown key types broke executor tests that pass
Mock objects or raw tuples as keys (Lambda, Batch, ECS, Kubernetes).
Restore the original fallback to CallbackState for any non-TaskInstanceKey,
matching main's behavior.
…pat with 3.2.x

The test_process_workloads_routes_execute_callback test uses
CallbackKey(id=...) which requires the dataclass form introduced in
3.3. In Airflow 3.2.x, CallbackKey is a str type alias and does not
accept keyword arguments. Change the skipif guard from
AIRFLOW_V_3_2_PLUS to AIRFLOW_V_3_3_PLUS.
…rialization

- Only populate version_data when dag_run.bundle_version is not None,
  mirroring the pinning rule for bundle_version (kaxil feedback)
- Add model_serializer to BundleInfo so version_data is absent (not null)
  on the wire when None (ashb feedback)
- Update edge OpenAPI spec: version_data is type:object, not anyOf with null
- Add :param version_data: to BaseDagBundle docstring (kaxil feedback)
- Remove unrelated changes from bad rebase (types.py, celery test) that
  were fixed separately in apache#66973 (ashb feedback)
…generate artifacts

The model_serializer(mode='wrap') on BundleInfo caused Pydantic to
lose JSON schema information, making the OpenAPI generator produce a
generic 'additionalProperties: true, type: object' instead of the
full BundleInfo schema with name/version/version_data fields.

Removing the custom serializer restores correct schema generation.
The version_data field is Optional so receivers already handle null.

Also regenerates:
- edge OpenAPI spec (v2-edge-generated.yaml)
- supervisor schema snapshot (schema.json)
- uv.lock (reflects upstream dependency changes after rebase)
@o-nikolas o-nikolas force-pushed the onikolas/pr2/s3-bundle-version-worker-flow branch from 3fb016b to 87dd9a8 Compare May 26, 2026 00:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:DAG-processing area:Executors-core LocalExecutor & SequentialExecutor area:Scheduler including HA (high availability) scheduler area:task-sdk

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants