Skip to content

Thread version_data through BundleInfo to worker-side bundle initialization#67217

Open
o-nikolas wants to merge 8 commits into
apache:mainfrom
aws-mwaa:onikolas/pr2/s3-bundle-version-worker-flow
Open

Thread version_data through BundleInfo to worker-side bundle initialization#67217
o-nikolas wants to merge 8 commits into
apache:mainfrom
aws-mwaa:onikolas/pr2/s3-bundle-version-worker-flow

Conversation

@o-nikolas
Copy link
Copy Markdown
Contributor

This is PR 2 of the S3 Dag Bundle versioning series. PR 1 (#66491) added the BundleVersion dataclass, Alembic migration, and persistence path. This PR completes the worker-side plumbing so that version data reaches the bundle instance at task execution time.

Adds version_data to BundleInfo and threads it through the worker-side bundle initialization path so that structured version metadata (e.g., S3 manifests) reaches the bundle at task execution time.

Changes:

  • BundleInfo gains version_data: dict | None = None field
  • ExecuteTask.make() reads version_data from DagVersion (via eagerly-loaded relationship)
  • Scheduler query adds selectinload(TI.dag_version) to avoid N+1 queries
  • BaseDagBundle.__init__ accepts and stores version_data
  • DagBundlesManager.get_bundle() passes version_data to the bundle constructor
  • task_runner.parse() and callback_supervisor pass version_data through
  • Task SDK _generated.py updated with the new field

related: #66491


Was generative AI tooling used to co-author this PR?
  • Yes — Claude Code (Opus 4)

Generated-by: Claude Code (Opus 4) following the guidelines

Copy link
Copy Markdown
Contributor

@ferruzzi ferruzzi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want any unit tests? It's just plumbing a value through, so maybe not necessary. Feels pretty trivial on this PR so I'll approve but felt odd not calling it out.

Comment thread airflow-core/src/airflow/dag_processing/bundles/base.py Outdated
o-nikolas added 3 commits May 20, 2026 01:23
…zation

Add version_data to the push path so structured bundle metadata (e.g.,
S3 manifests) reaches workers at task execution time.

Changes:
- Add version_data field to BundleInfo (workloads/base.py)
- Populate version_data from DagVersion in ExecuteTask.make()
- Add selectinload(TI.dag_version) to scheduler enqueue query to
  avoid N+1 queries when reading version_data
- Add version_data parameter to BaseDagBundle.__init__ (stored as
  self.version_data) and DagBundlesManager.get_bundle()
- Pass version_data through task_runner.py and callback_supervisor.py
- Regenerate task-sdk datamodels to include version_data in BundleInfo

Existing bundles ignore version_data (defaults to None). The S3 bundle
will use self.version_data in initialize() to fetch specific object
versions (follow-up PR).
Address review feedback:
- Use dict[str, Any] | None instead of bare dict | None for version_data
  in both BaseDagBundle.__init__ and BundleInfo
- Add minimal tests verifying version_data plumbing through the bundle
  constructor
@o-nikolas o-nikolas force-pushed the onikolas/pr2/s3-bundle-version-worker-flow branch from 97dc507 to 48a9b1b Compare May 20, 2026 08:23
o-nikolas added 5 commits May 20, 2026 10:00
…nc signature

The mypy providers check failed because execute_async declared
key as 'TaskInstanceKey | str' but BatchQueuedJob.key expects
BatchJobWorkloadKey (TaskInstanceKey | CallbackKey). Although
CallbackKey is aliased to str, mypy treats the nominal type alias
as distinct. Use the proper CallbackKey import to satisfy mypy.
…stanceKey types

The strict TypeError for unknown key types broke executor tests that pass
Mock objects or raw tuples as keys (Lambda, Batch, ECS, Kubernetes).
Restore the original fallback to CallbackState for any non-TaskInstanceKey,
matching main's behavior.
…pat with 3.2.x

The test_process_workloads_routes_execute_callback test uses
CallbackKey(id=...) which requires the dataclass form introduced in
3.3. In Airflow 3.2.x, CallbackKey is a str type alias and does not
accept keyword arguments. Change the skipif guard from
AIRFLOW_V_3_2_PLUS to AIRFLOW_V_3_3_PLUS.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:DAG-processing area:Executors-core LocalExecutor & SequentialExecutor area:Scheduler including HA (high availability) scheduler area:task-sdk

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants