Skip to content

Commit

Permalink
AIP-72 Add Task Scheduling Metadata to TaskInstance (apache#45008)
Browse files Browse the repository at this point in the history
* Extend the queue_workload() call with an ORM session

* Add support for priority, queue and pool slots in AIP-72 task instance

* Fix pytests

* Fix pytests

* Fix pytests

* Update changelog

* Ruff
  • Loading branch information
jscheffl authored and HariGS-DB committed Jan 16, 2025
1 parent cea6e4f commit 48b7900
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 4 deletions.
4 changes: 4 additions & 0 deletions airflow/executors/workloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ class TaskInstance(BaseModel):
try_number: int
map_index: int | None = None

pool_slots: int
queue: str
priority_weight: int

# TODO: Task-SDK: Can we replace TastInstanceKey with just the uuid across the codebase?
@property
def key(self) -> TaskInstanceKey:
Expand Down
8 changes: 8 additions & 0 deletions providers/src/airflow/providers/edge/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@
Changelog
---------

0.10.1pre0
..........

Misc
~~~~

* ``Re-add the feature to support pool slots in concurrency calculation for Airflow 3.``

0.10.0pre0
..........

Expand Down
2 changes: 1 addition & 1 deletion providers/src/airflow/providers/edge/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

__all__ = ["__version__"]

__version__ = "0.10.0pre0"
__version__ = "0.10.1pre0"

if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse(
"2.10.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ def queue_workload(
map_index=key.map_index,
try_number=key.try_number,
state=TaskInstanceState.QUEUED,
queue=DEFAULT_QUEUE, # TODO Queues to be added once implemented in AIP-72
concurrency_slots=1, # TODO Pool slots to be added once implemented in AIP-72
queue=task_instance.queue,
concurrency_slots=task_instance.pool_slots,
command=workload.model_dump_json(),
)
)
Expand Down
2 changes: 1 addition & 1 deletion providers/src/airflow/providers/edge/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ source-date-epoch: 1729683247

# note that those versions are maintained by release manager - do not update them manually
versions:
- 0.10.0pre0
- 0.10.1pre0

dependencies:
- apache-airflow>=2.10.0
Expand Down
3 changes: 3 additions & 0 deletions providers/tests/edge/cli/test_edge_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@
"dag_id": "dummy",
"run_id": "dummy",
"try_number": 1,
"pool_slots": 1,
"queue": "default",
"priority_weight": 1,
},
"dag_path": "dummy.py",
"log_path": "dummy.log",
Expand Down
3 changes: 3 additions & 0 deletions providers/tests/edge/executors/test_edge_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,9 @@ def test_queue_workload(self):
dag_id="dummy",
run_id="dummy",
try_number=1,
pool_slots=1,
queue="default",
priority_weight=1,
),
dag_path="dummy.py",
log_path="dummy.log",
Expand Down
3 changes: 3 additions & 0 deletions tests/executors/test_local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ def _test_execute(self, mock_supervise, parallelism=1):
run_id="run1",
try_number=1,
state="queued",
pool_slots=1,
queue="default",
priority_weight=1,
)
for i in range(self.TEST_SUCCESS_COMMANDS)
]
Expand Down

0 comments on commit 48b7900

Please sign in to comment.