Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ def date_param():
# Plugins command
"plugins list",
"plugins list-import-errors",
# Tasks commands
"tasks clear example_bash_operator",
]

NO_AUTH_TEST_COMMANDS = [
Expand Down
1 change: 1 addition & 0 deletions airflow-ctl/docs/images/command_hashes.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ providers:34502fe09dc0b8b0a13e7e46efdffda6
variables:f8fc76d3d398b2780f4e97f7cd816646
version:31f4efdf8de0dbaaa4fac71ff7efecc3
plugins:4864fd8f356704bd2b3cd1aec3567e35
tasks:8ca7306be97d1c8788dbfbe4b0f8bf61
auth login:9fe2bb1dd5c602beea2eefb33a2b20a8
98 changes: 98 additions & 0 deletions airflow-ctl/docs/images/output_tasks.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
7 changes: 7 additions & 0 deletions airflow-ctl/src/airflowctl/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
PoolsOperations,
ProvidersOperations,
ServerResponseError,
TasksOperations,
VariablesOperations,
VersionOperations,
XComOperations,
Expand Down Expand Up @@ -473,6 +474,12 @@ def plugins(self):
"""Operations related to plugins."""
return PluginsOperations(self)

@lru_cache() # type: ignore[prop-decorator]
@property
def tasks(self):
"""Operations related to tasks."""
return TasksOperations(self)


# API Client Decorator for CLI Actions
@contextlib.contextmanager
Expand Down
17 changes: 17 additions & 0 deletions airflow-ctl/src/airflowctl/api/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
BulkBodyPoolBody,
BulkBodyVariableBody,
BulkResponse,
ClearTaskInstancesBody,
Config,
ConnectionBody,
ConnectionCollectionResponse,
Expand Down Expand Up @@ -68,6 +69,7 @@
ProviderCollectionResponse,
QueuedEventCollectionResponse,
QueuedEventResponse,
TaskInstanceCollectionResponse,
TriggerDAGRunPostBody,
VariableBody,
VariableCollectionResponse,
Expand Down Expand Up @@ -646,6 +648,21 @@ def list(
raise e


class TasksOperations(BaseOperations):
"""Tasks operations."""

def clear(
self, dag_id: str, body: ClearTaskInstancesBody
) -> TaskInstanceCollectionResponse | ServerResponseError:
"""Clear task instances."""
try:
payload = body.model_dump(exclude_unset=True, by_alias=True)
self.response = self.client.post(f"/dags/{dag_id}/clearTaskInstances", json=payload)
return TaskInstanceCollectionResponse.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e


class JobsOperations(BaseOperations):
"""Job operations."""

Expand Down
2 changes: 1 addition & 1 deletion airflow-ctl/src/airflowctl/ctl/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ def __init__(self, file_path: str | Path | None = None):
# Exclude parameters that are not needed for CLI from datamodels
self.excluded_parameters = ["schema_"]
# This list is used to determine if the command/operation needs to output data
self.output_command_list = ["list", "get", "create", "delete", "update", "trigger", "add", "edit"]
self.output_command_list = ["list", "get", "create", "delete", "update", "trigger", "add", "edit", "clear"]
self.exclude_operation_names = ["LoginOperations", "VersionOperations", "BaseOperations"]
self.exclude_method_names = [
"error",
Expand Down
4 changes: 4 additions & 0 deletions airflow-ctl/src/airflowctl/ctl/help_texts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,7 @@ xcom:
plugins:
list: "List all installed Airflow plugins"
list-import-errors: "List all plugin import errors"

tasks:
clear: "Clear task instances"

21 changes: 21 additions & 0 deletions airflow-ctl/tests/airflow_ctl/api/test_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -1140,6 +1140,27 @@ def handle_request(request: httpx.Request) -> httpx.Response:
assert response == self.dag_run_response


class TestTasksOperations:
dag_id = "dag_id"
task_instance_response = Mock()

def test_clear(self):
from airflowctl.api.datamodels.generated import ClearTaskInstancesBody, TaskInstanceCollectionResponse

clear_body = ClearTaskInstancesBody(dry_run=True, task_ids=["task_1", "task_2"])
expected_response = TaskInstanceCollectionResponse(task_instances=[], total_entries=0)

def handle_request(request: httpx.Request) -> httpx.Response:
assert request.url.path == f"/api/v2/dags/{self.dag_id}/clearTaskInstances"
# Ensure body is correctly serialized
assert json.loads(request.content.decode()) == clear_body.model_dump(exclude_unset=True, by_alias=True)
return httpx.Response(200, json=json.loads(expected_response.model_dump_json()))

client = make_api_client(transport=httpx.MockTransport(handle_request))
response = client.tasks.clear(dag_id=self.dag_id, body=clear_body)
assert response == expected_response


class TestDagRunOperations:
dag_id = "dag_id"
dag_run_id = "dag_run_id"
Expand Down
1 change: 1 addition & 0 deletions scripts/in_container/run_capture_airflowctl_help.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"variables",
"version",
"plugins",
"tasks",
]

SUBCOMMANDS = [
Expand Down