diff --git a/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py b/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py index b8b49ad3ae778..e1cfc665804d9 100644 --- a/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py +++ b/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py @@ -86,6 +86,7 @@ def date_param(): "dags trigger example_bash_operator --logical-date={date_param} --run-after={date_param}", # Test trigger without logical-date (should default to now) "dags trigger example_bash_operator", + "dags next-execution example_bash_operator", "dags pause example_bash_operator", "dags unpause example_bash_operator", # Dag Run commands diff --git a/airflow-ctl/docs/images/command_hashes.txt b/airflow-ctl/docs/images/command_hashes.txt index 8e590f3b820cd..c6f95ad81db92 100644 --- a/airflow-ctl/docs/images/command_hashes.txt +++ b/airflow-ctl/docs/images/command_hashes.txt @@ -4,7 +4,7 @@ auth:d79e9c7d00c432bdbcbc2a86e2e32053 backfill:74c8737b0a62a86ed3605fa9e6165874 config:a3d936cb15fe3b547bf6c82cf93d923f connections:942f9f88cb908c28bf5c19159fc5065b -dags:e2a18f90b1bd150be981cef6fef91858 +dags:0679bdf1ec6e602b804973388be8ac7f dagrun:c32e0011aa9a845456c778786717208e jobs:a5b644c5da8889443bb40ee10b599270 pools:19efe105b9515ab1926ebcaf0e028d71 diff --git a/airflow-ctl/docs/images/output_dags.svg b/airflow-ctl/docs/images/output_dags.svg index f38d12d60e343..54156fe72cc76 100644 --- a/airflow-ctl/docs/images/output_dags.svg +++ b/airflow-ctl/docs/images/output_dags.svg @@ -1,4 +1,4 @@ - + - - + + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + + + + - + - + - - Usage:airflowctl dags [-hCOMMAND... - -Perform Dags operations - -Positional Arguments: -COMMAND -deleteDelete a Dag by its ID -getRetrieve a Dag by its ID -get-detailsRetrieve detailed information for a Dag -get-import-errorRetrieve a Dag import error by its ID -get-statsRetrieve run statistics for one or more Dags -get-tagsList all tags used across Dags -get-versionRetrieve a specific version of a Dag -listList all Dags -list-import-errors -List all Dag import errors -list-versionList all versions of a Dag -list-warningList all Dag warnings -pausePause a Dag -triggerTrigger a new Dag run -unpauseUnpause a Dag -updateUpdate properties of a Dag - -Options: --h--helpshow this help message and exit + + Usage:airflowctl dags [-hCOMMAND... + +Perform Dags operations + +Positional Arguments: +COMMAND +deleteDelete a Dag by its ID +getRetrieve a Dag by its ID +get-detailsRetrieve detailed information for a Dag +get-import-errorRetrieve a Dag import error by its ID +get-statsRetrieve run statistics for one or more Dags +get-tagsList all tags used across Dags +get-versionRetrieve a specific version of a Dag +listList all Dags +list-import-errors +List all Dag import errors +list-versionList all versions of a Dag +list-warningList all Dag warnings +next-executionShow the next scheduled execution time for a DAG +pausePause a Dag +triggerTrigger a new Dag run +unpauseUnpause a Dag +updateUpdate properties of a Dag + +Options: +-h--helpshow this help message and exit diff --git a/airflow-ctl/src/airflowctl/ctl/cli_config.py b/airflow-ctl/src/airflowctl/ctl/cli_config.py index 1d3c30121b619..555183dadfb63 100755 --- a/airflow-ctl/src/airflowctl/ctl/cli_config.py +++ b/airflow-ctl/src/airflowctl/ctl/cli_config.py @@ -959,6 +959,15 @@ def merge_commands( ) DAG_COMMANDS = ( + ActionCommand( + name="next-execution", + help="Show the next scheduled execution time for a DAG", + func=lazy_load_command("airflowctl.ctl.commands.dag_command.next_execution"), + args=( + ARG_DAG_ID, + ARG_OUTPUT, + ), + ), ActionCommand( name="pause", help="Pause a Dag", diff --git a/airflow-ctl/src/airflowctl/ctl/commands/dag_command.py b/airflow-ctl/src/airflowctl/ctl/commands/dag_command.py index 33b3d7c95fefa..301821f9c2c3c 100644 --- a/airflow-ctl/src/airflowctl/ctl/commands/dag_command.py +++ b/airflow-ctl/src/airflowctl/ctl/commands/dag_command.py @@ -72,3 +72,34 @@ def unpause(args, api_client=NEW_API_CLIENT) -> None: api_client=api_client, output=args.output, ) + + +_NEXT_EXECUTION_FIELDS = ( + "next_dagrun_logical_date", + "next_dagrun_data_interval_start", + "next_dagrun_data_interval_end", + "next_dagrun_run_after", +) + + +@provide_api_client(kind=ClientKind.CLI) +def next_execution(args, api_client=NEW_API_CLIENT) -> dict | None: + """Show next scheduled execution time for a DAG.""" + try: + response = api_client.dags.get(dag_id=args.dag_id) + except ServerResponseError as e: + rich.print(f"[red]Error retrieving DAG {args.dag_id}: {e}[/red]") + sys.exit(1) + + next_exec_data = {field: getattr(response, field) for field in _NEXT_EXECUTION_FIELDS} + + if all(value is None for value in next_exec_data.values()): + rich.print(f"[yellow]No upcoming run scheduled for DAG {args.dag_id}.[/yellow]") + return None + + result = next_exec_data + AirflowConsole().print_as( + data=[result], + output=args.output, + ) + return result diff --git a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_dag_command.py b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_dag_command.py index 8eb65d4668688..405eb030065f8 100644 --- a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_dag_command.py +++ b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_dag_command.py @@ -90,6 +90,36 @@ class TestDagCommands: is_stale=False, ) + dag_response_no_schedule = DAGResponse( + dag_id=dag_id, + dag_display_name=dag_display_name, + is_paused=True, + last_parsed_time=datetime.datetime(2024, 12, 31, 23, 59, 59), + last_expired=datetime.datetime(2025, 1, 1, 0, 0, 0), + fileloc="fileloc", + relative_fileloc="relative_fileloc", + description="description", + timetable_summary=None, + timetable_description=None, + timetable_partitioned=False, + timetable_periodic=False, + tags=[], + max_active_tasks=1, + max_active_runs=1, + max_consecutive_failed_dag_runs=1, + has_task_concurrency_limits=False, + has_import_errors=False, + next_dagrun_logical_date=None, + next_dagrun_data_interval_start=None, + next_dagrun_data_interval_end=None, + next_dagrun_run_after=None, + owners=["apache-airflow"], + is_backfillable=False, + file_token="file_token", + bundle_name="bundle_name", + is_stale=False, + ) + def test_pause_dag(self, api_client_maker, monkeypatch): api_client = api_client_maker( path=f"/api/v2/dags/{self.dag_id}", @@ -143,3 +173,45 @@ def test_unpause_fail(self, api_client_maker, monkeypatch): self.parser.parse_args(["dags", "unpause", self.dag_id]), api_client=api_client, ) + + def test_next_execution(self, api_client_maker): + api_client = api_client_maker( + path=f"/api/v2/dags/{self.dag_id}", + response_json=self.dag_response_paused.model_dump(mode="json"), + expected_http_status_code=200, + kind=ClientKind.CLI, + ) + result = dag_command.next_execution( + self.parser.parse_args(["dags", "next-execution", self.dag_id]), + api_client=api_client, + ) + assert result["next_dagrun_logical_date"] == datetime.datetime(2025, 1, 1, 0, 0, 0) + assert result["next_dagrun_data_interval_start"] == datetime.datetime(2025, 1, 1, 0, 0, 0) + assert result["next_dagrun_data_interval_end"] == datetime.datetime(2025, 1, 1, 0, 0, 0) + assert result["next_dagrun_run_after"] == datetime.datetime(2025, 1, 1, 0, 0, 0) + + def test_next_execution_no_schedule(self, api_client_maker): + api_client = api_client_maker( + path=f"/api/v2/dags/{self.dag_id}", + response_json=self.dag_response_no_schedule.model_dump(mode="json"), + expected_http_status_code=200, + kind=ClientKind.CLI, + ) + result = dag_command.next_execution( + self.parser.parse_args(["dags", "next-execution", self.dag_id]), + api_client=api_client, + ) + assert result is None + + def test_next_execution_fail(self, api_client_maker): + api_client = api_client_maker( + path=f"/api/v2/dags/{self.dag_id}", + response_json={"detail": "DAG not found"}, + expected_http_status_code=404, + kind=ClientKind.CLI, + ) + with pytest.raises(SystemExit): + dag_command.next_execution( + self.parser.parse_args(["dags", "next-execution", self.dag_id]), + api_client=api_client, + )