diff --git a/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py b/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py index ceb21a6e056c6..2147bbf181600 100644 --- a/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py +++ b/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py @@ -111,6 +111,8 @@ def date_param(): "pools export tests/airflowctl_tests/fixtures/pools_export.json --output=json", "pools delete --pool=test_pool", "pools delete --pool=test_import_pool", + # Tasks commands + 'tasks state --dag-id=example_bash_operator --dag-run-id="manual__{date_param}" --task-id=runme_0', # Providers commands "providers list", # Variables commands diff --git a/airflow-ctl/docs/images/command_hashes.txt b/airflow-ctl/docs/images/command_hashes.txt index 8e590f3b820cd..8ef8998fbc9f0 100644 --- a/airflow-ctl/docs/images/command_hashes.txt +++ b/airflow-ctl/docs/images/command_hashes.txt @@ -1,4 +1,4 @@ -main:27a22c00dcf32e7a1a4f06672dc8e3c8 +main:0c1b0d44d83b049c1e5d9f56ece76c6d assets:70619a2d92bda80930cde2aefcd8e1cd auth:d79e9c7d00c432bdbcbc2a86e2e32053 backfill:74c8737b0a62a86ed3605fa9e6165874 diff --git a/airflow-ctl/docs/images/output_main.svg b/airflow-ctl/docs/images/output_main.svg index f586877bce8eb..7c5a66a1aefe1 100644 --- a/airflow-ctl/docs/images/output_main.svg +++ b/airflow-ctl/docs/images/output_main.svg @@ -1,4 +1,4 @@ - + - - + + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + + + + - + - + - - Usage:airflowctl [-hGROUP_OR_COMMAND... - -Positional Arguments: -GROUP_OR_COMMAND - -    Groups -assetsPerform Assets operations -authManage authentication for CLI. Either pass token from -environment variable/parameter or pass username and -password. -backfillPerform Backfill operations -configPerform Config operations -connectionsPerform Connections operations -dagrunPerform DagRun operations -dagsPerform Dags operations -jobsPerform Jobs operations -pluginsPerform Plugins operations -poolsPerform Pools operations -providersPerform Providers operations -variablesPerform Variables operations -xcomPerform XCom operations - -    Commands: -versionShow version information - -Options: --h--helpshow this help message and exit + + Usage: airflowctl [-h] GROUP_OR_COMMAND ... + +Positional Arguments: +  GROUP_OR_COMMAND + +    Groups +      assets        Perform Assets operations +      auth          Manage authentication for CLI. Either pass token from +                    environment variable/parameter or pass username and +                    password. +      backfill      Perform Backfill operations +      config        Perform Config operations +      connections   Perform Connections operations +      dagrun        Perform DagRun operations +      dags          Perform Dags operations +      jobs          Perform Jobs operations +      plugins       Perform Plugins operations +      pools         Perform Pools operations +      providers     Perform Providers operations +      tasks         Manage Airflow task instances +      variables     Perform Variables operations +      xcom          Perform XCom operations + +    Commands: +      version       Show version information + +Options: +  -h, --help        show this help message and exit diff --git a/airflow-ctl/src/airflowctl/ctl/cli_config.py b/airflow-ctl/src/airflowctl/ctl/cli_config.py index aa96304f501fb..7afe955077b79 100755 --- a/airflow-ctl/src/airflowctl/ctl/cli_config.py +++ b/airflow-ctl/src/airflowctl/ctl/cli_config.py @@ -267,6 +267,27 @@ def _load_help_texts_yaml() -> dict[str, dict[str, str]]: type=str, help="The Dag ID of the Dag to pause or unpause", ) +ARG_DAG_ID_OPTION = Arg( + flags=("--dag-id",), + type=str, + dest="dag_id", + required=True, + help="The Dag ID", +) +ARG_DAG_RUN_ID = Arg( + flags=("--dag-run-id",), + type=str, + dest="dag_run_id", + required=True, + help="The Dag run ID", +) +ARG_TASK_ID_OPTION = Arg( + flags=("--task-id",), + type=str, + dest="task_id", + required=True, + help="The task ID", +) ARG_ACTION_ON_EXISTING_KEY = Arg( flags=("-a", "--action-on-existing-key"), @@ -953,6 +974,15 @@ def merge_commands( ), ) +TASK_COMMANDS = ( + ActionCommand( + name="state", + help="Get the state of a task instance", + func=lazy_load_command("airflowctl.ctl.commands.task_command.state"), + args=(ARG_DAG_ID_OPTION, ARG_DAG_RUN_ID, ARG_TASK_ID_OPTION, ARG_OUTPUT), + ), +) + core_commands: list[CLICommand] = [ GroupCommand( name="auth", @@ -980,6 +1010,11 @@ def merge_commands( help="Manage Airflow pools", subcommands=POOL_COMMANDS, ), + GroupCommand( + name="tasks", + help="Manage Airflow task instances", + subcommands=TASK_COMMANDS, + ), ActionCommand( name="version", help="Show version information", diff --git a/airflow-ctl/src/airflowctl/ctl/commands/task_command.py b/airflow-ctl/src/airflowctl/ctl/commands/task_command.py new file mode 100644 index 0000000000000..b664d455a89f9 --- /dev/null +++ b/airflow-ctl/src/airflowctl/ctl/commands/task_command.py @@ -0,0 +1,33 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from airflowctl.api.client import NEW_API_CLIENT, ClientKind, provide_api_client +from airflowctl.api.datamodels.generated import TaskInstanceResponse +from airflowctl.ctl.console_formatting import AirflowConsole + + +@provide_api_client(kind=ClientKind.CLI) +def state(args, api_client=NEW_API_CLIENT) -> None: + """Get the state of a task instance.""" + response = api_client.get(f"dags/{args.dag_id}/dagRuns/{args.dag_run_id}/taskInstances/{args.task_id}") + task_instance = TaskInstanceResponse.model_validate_json(response.content) + AirflowConsole().print_as( + data=[{"state": task_instance.state.value if task_instance.state is not None else None}], + output=args.output, + ) diff --git a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_task_command.py b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_task_command.py new file mode 100644 index 0000000000000..b8b375b9badae --- /dev/null +++ b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_task_command.py @@ -0,0 +1,91 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import datetime +from uuid import uuid4 + +from airflowctl.api.client import ClientKind +from airflowctl.api.datamodels.generated import TaskInstanceResponse, TaskInstanceState +from airflowctl.ctl import cli_parser +from airflowctl.ctl.commands import task_command + + +class TestTaskCommands: + parser = cli_parser.get_parser() + + def test_task_state(self, api_client_maker, capsys): + response = TaskInstanceResponse( + id=uuid4(), + task_id="runme_0", + dag_id="example_bash_operator", + dag_run_id="manual__2025-01-01T00:00:00+00:00", + map_index=-1, + logical_date=datetime.datetime(2025, 1, 1, 0, 0, 0), + run_after=datetime.datetime(2025, 1, 1, 0, 0, 0), + start_date=None, + end_date=None, + duration=None, + state=TaskInstanceState.SUCCESS, + try_number=1, + max_tries=1, + task_display_name="runme_0", + dag_display_name="example_bash_operator", + hostname=None, + unixname=None, + pool="default_pool", + pool_slots=1, + queue=None, + priority_weight=None, + operator=None, + operator_name=None, + queued_when=None, + scheduled_when=None, + pid=None, + executor=None, + executor_config="{}", + note=None, + rendered_map_index=None, + rendered_fields=None, + trigger=None, + triggerer_job=None, + dag_version=None, + ) + api_client = api_client_maker( + path="/api/v2/dags/example_bash_operator/dagRuns/manual__2025-01-01T00:00:00+00:00/taskInstances/runme_0", + response_json=response.model_dump(mode="json"), + expected_http_status_code=200, + kind=ClientKind.CLI, + ) + + args = self.parser.parse_args( + [ + "tasks", + "state", + "--dag-id", + "example_bash_operator", + "--dag-run-id", + "manual__2025-01-01T00:00:00+00:00", + "--task-id", + "runme_0", + ] + ) + + task_command.state(args, api_client=api_client) + + captured = capsys.readouterr() + assert '"state": "success"' in captured.out