From f15fd365c69fe5448f8067dd388d811534b9dd7f Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Tue, 16 Jun 2026 09:45:21 +0900 Subject: [PATCH] refactor: [AIP-94] airflowctl variables: add set command Signed-off-by: PoAn Yang --- .../airflow/cli/commands/variable_command.py | 30 +++++- .../cli/commands/test_command_deprecations.py | 3 +- .../cli/commands/test_variable_command.py | 98 ++++++++++--------- .../test_airflowctl_commands.py | 5 + airflow-ctl/docs/images/command_hashes.txt | 2 +- airflow-ctl/docs/images/output_variables.svg | 90 +++++++++-------- airflow-ctl/src/airflowctl/ctl/cli_config.py | 21 ++++ .../ctl/commands/variable_command.py | 19 ++++ .../ctl/commands/test_variable_command.py | 34 +++++++ 9 files changed, 209 insertions(+), 93 deletions(-) diff --git a/airflow-core/src/airflow/cli/commands/variable_command.py b/airflow-core/src/airflow/cli/commands/variable_command.py index 5216aabb446c6..b2faf66fcdc09 100644 --- a/airflow-core/src/airflow/cli/commands/variable_command.py +++ b/airflow-core/src/airflow/cli/commands/variable_command.py @@ -23,10 +23,17 @@ import os from typing import TYPE_CHECKING +from airflowctl.api.datamodels.generated import ( + BulkActionOnExistence, + BulkBodyVariableBody, + BulkCreateActionVariableBody, + VariableBody, +) from sqlalchemy import select +from airflow.cli.api_client import NEW_API_CLIENT, Client, provide_api_client from airflow.cli.simple_table import AirflowConsole -from airflow.cli.utils import SENSITIVE_PLACEHOLDER, print_export_output +from airflow.cli.utils import SENSITIVE_PLACEHOLDER, deprecated_for_airflowctl, print_export_output from airflow.exceptions import ( AirflowFileParseException, AirflowUnsupportedFileTypeException, @@ -108,10 +115,25 @@ def variables_get(args): @cli_utils.action_cli +@deprecated_for_airflowctl("airflowctl variables set") +@suppress_logs_and_warning @providers_configuration_loaded -def variables_set(args): - """Create new variable with a given name, value and description.""" - Variable.set(args.key, args.value, args.description, serialize_json=args.json) +@provide_api_client +def variables_set(args, api_client: Client = NEW_API_CLIENT): + """Set a variable, creating it if it does not exist and updating it otherwise.""" + value = args.value + if args.json: + value = json.dumps(value, indent=2) + bulk_body = BulkBodyVariableBody( + actions=[ + BulkCreateActionVariableBody( + action="create", + entities=[VariableBody(key=args.key, value=value, description=args.description)], + action_on_existence=BulkActionOnExistence.OVERWRITE, + ) + ] + ) + api_client.variables.bulk(variables=bulk_body) print(f"Variable {args.key} created") diff --git a/airflow-core/tests/unit/cli/commands/test_command_deprecations.py b/airflow-core/tests/unit/cli/commands/test_command_deprecations.py index b4eb6840c9069..c329278a66c5a 100644 --- a/airflow-core/tests/unit/cli/commands/test_command_deprecations.py +++ b/airflow-core/tests/unit/cli/commands/test_command_deprecations.py @@ -30,7 +30,7 @@ import pytest -from airflow.cli.commands import asset_command, dag_command, pool_command +from airflow.cli.commands import asset_command, dag_command, pool_command, variable_command from airflow.exceptions import RemovedInAirflow4Warning # (command callable, argv to parse, expected airflowctl replacement named in the warning) @@ -52,6 +52,7 @@ ["assets", "materialize", "--name=foo"], "airflowctl assets materialize", ), + (variable_command.variables_set, ["variables", "set", "foo", "bar"], "airflowctl variables set"), ] diff --git a/airflow-core/tests/unit/cli/commands/test_variable_command.py b/airflow-core/tests/unit/cli/commands/test_variable_command.py index a02c95aa31722..0305a1cc33f42 100644 --- a/airflow-core/tests/unit/cli/commands/test_variable_command.py +++ b/airflow-core/tests/unit/cli/commands/test_variable_command.py @@ -24,6 +24,7 @@ import pytest import yaml +from airflowctl.api.datamodels.generated import BulkActionOnExistence from sqlalchemy import select from airflow import models @@ -129,14 +130,26 @@ def setup_method(self): def teardown_method(self): clear_db_variables() - def test_variables_set(self): + @staticmethod + def _set_entity(mock_cli_api_client): + """Return the single VariableBody sent by ``set`` through the bulk request.""" + bulk_body = mock_cli_api_client.variables.bulk.call_args.kwargs["variables"] + action = bulk_body.actions[0] + assert action.action == "create" + assert action.action_on_existence == BulkActionOnExistence.OVERWRITE + return action.entities[0] + + def test_variables_set(self, mock_cli_api_client): """Test variable_set command""" variable_command.variables_set(self.parser.parse_args(["variables", "set", "foo", "bar"])) - assert Variable.get("foo") is not None - with pytest.raises(KeyError): - Variable.get("foo1") - def test_variables_set_with_description(self): + mock_cli_api_client.variables.bulk.assert_called_once() + entity = self._set_entity(mock_cli_api_client) + assert entity.key == "foo" + assert entity.value.root == "bar" + assert entity.description is None + + def test_variables_set_with_description(self, mock_cli_api_client): """Test variable_set command with optional description argument""" expected_var_desc = "foo_bar_description" var_key = "foo" @@ -144,13 +157,15 @@ def test_variables_set_with_description(self): self.parser.parse_args(["variables", "set", var_key, "bar", "--description", expected_var_desc]) ) - assert Variable.get(var_key) == "bar" - with create_session() as session: - actual_var_desc = session.scalar(select(Variable.description).where(Variable.key == var_key)) - assert actual_var_desc == expected_var_desc + assert self._set_entity(mock_cli_api_client).description == expected_var_desc - with pytest.raises(KeyError): - Variable.get("foo1") + def test_variables_set_serialize_json(self, mock_cli_api_client): + """Test variable_set command with json argument""" + variable_command.variables_set( + self.parser.parse_args(["variables", "set", "foo", '{"a": 1}', "--json"]) + ) + + assert self._set_entity(mock_cli_api_client).value.root == json.dumps('{"a": 1}', indent=2) def test_variables_get(self, stdout_capture): Variable.set("foo", {"foo": "bar"}, serialize_json=True) @@ -171,25 +186,19 @@ def test_get_variable_missing_variable(self): variable_command.variables_get(self.parser.parse_args(["variables", "get", "no-existing-VAR"])) def test_variables_set_different_types(self): - """Test storage of various data types""" - # Set a dict - variable_command.variables_set( - self.parser.parse_args(["variables", "set", "dict", '{"foo": "oops"}']) - ) - # Set a list - variable_command.variables_set(self.parser.parse_args(["variables", "set", "list", '["oops"]'])) - # Set str - variable_command.variables_set(self.parser.parse_args(["variables", "set", "str", "hello string"])) - # Set int - variable_command.variables_set(self.parser.parse_args(["variables", "set", "int", "42"])) - # Set float - variable_command.variables_set(self.parser.parse_args(["variables", "set", "float", "42.0"])) - # Set true - variable_command.variables_set(self.parser.parse_args(["variables", "set", "true", "true"])) - # Set false - variable_command.variables_set(self.parser.parse_args(["variables", "set", "false", "false"])) - # Set none - variable_command.variables_set(self.parser.parse_args(["variables", "set", "null", "null"])) + """Test export/import round-trips storage of various data types. + + ``set`` is migrated to the airflowctl client, so the variables are seeded directly + through the model here; ``export``/``import`` remain local DB commands. + """ + Variable.set("dict", '{"foo": "oops"}') + Variable.set("list", '["oops"]') + Variable.set("str", "hello string") + Variable.set("int", "42") + Variable.set("float", "42.0") + Variable.set("true", "true") + Variable.set("false", "false") + Variable.set("null", "null") # Export and then import variable_command.variables_export( @@ -210,8 +219,8 @@ def test_variables_set_different_types(self): assert Variable.get("null", deserialize_json=True) is None # test variable import skip existing - # set varliable list to ["airflow"] and have it skip during import - variable_command.variables_set(self.parser.parse_args(["variables", "set", "list", '["airflow"]'])) + # set variable list to ["airflow"] and have it skip during import + Variable.set("list", '["airflow"]') variable_command.variables_import( self.parser.parse_args( ["variables", "import", "variables_types.json", "--action-on-existing-key", "skip"] @@ -325,8 +334,8 @@ def test_variables_list_edge_cases(self): assert item["val"] == "***" def test_variables_delete(self): - """Test variable_delete command""" - variable_command.variables_set(self.parser.parse_args(["variables", "set", "foo", "bar"])) + """Test variable_delete command (``set`` is migrated, so seed via the model)""" + Variable.set("foo", "bar") variable_command.variables_delete(self.parser.parse_args(["variables", "delete", "foo"])) with pytest.raises(KeyError): Variable.get("foo") @@ -365,13 +374,13 @@ def test_variables_isolation(self, tmp_path): path1 = tmp_path / "testfile1.json" path2 = tmp_path / "testfile2.json" - # First export - variable_command.variables_set(self.parser.parse_args(["variables", "set", "foo", '{"foo":"bar"}'])) - variable_command.variables_set(self.parser.parse_args(["variables", "set", "bar", "original"])) + # First export (``set`` is migrated to airflowctl, so seed via the model) + Variable.set("foo", '{"foo":"bar"}') + Variable.set("bar", "original") variable_command.variables_export(self.parser.parse_args(["variables", "export", os.fspath(path1)])) - variable_command.variables_set(self.parser.parse_args(["variables", "set", "bar", "updated"])) - variable_command.variables_set(self.parser.parse_args(["variables", "set", "foo", '{"foo":"oops"}'])) + Variable.set("bar", "updated") + Variable.set("foo", '{"foo":"oops"}') variable_command.variables_delete(self.parser.parse_args(["variables", "delete", "foo"])) with create_session() as session: variable_command.variables_import( @@ -389,13 +398,10 @@ def test_variables_isolation(self, tmp_path): def test_variables_import_and_export_with_description(self, tmp_path): """Test variables_import with file-description parameter""" variables_types_file = tmp_path / "variables_types.json" - variable_command.variables_set( - self.parser.parse_args(["variables", "set", "foo", "bar", "--description", "Foo var description"]) - ) - variable_command.variables_set( - self.parser.parse_args(["variables", "set", "foo1", "bar1", "--description", "12"]) - ) - variable_command.variables_set(self.parser.parse_args(["variables", "set", "foo2", "bar2"])) + # ``set`` is migrated to airflowctl, so seed the variables via the model + Variable.set("foo", "bar", description="Foo var description") + Variable.set("foo1", "bar1", description="12") + Variable.set("foo2", "bar2") variable_command.variables_export( self.parser.parse_args(["variables", "export", os.fspath(variables_types_file)]) ) diff --git a/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py b/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py index e1cfc665804d9..2d3620119e560 100644 --- a/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py +++ b/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py @@ -120,6 +120,11 @@ def date_param(): "variables get test_key", "variables get test_key -o table", "variables update --key=test_key --value=updated_value", + # set is an upsert: the first call creates the key, the second updates the existing key. + "variables set test_set_key set_value", + "variables set test_set_key updated_set_value", + "variables get test_set_key", + "variables delete test_set_key", "variables import tests/airflowctl_tests/fixtures/test_variables.json", "variables delete test_key", "variables delete test_import_var", diff --git a/airflow-ctl/docs/images/command_hashes.txt b/airflow-ctl/docs/images/command_hashes.txt index 53c93e7546d1e..9fc2811f099bb 100644 --- a/airflow-ctl/docs/images/command_hashes.txt +++ b/airflow-ctl/docs/images/command_hashes.txt @@ -9,7 +9,7 @@ dagrun:c32e0011aa9a845456c778786717208e jobs:a5b644c5da8889443bb40ee10b599270 pools:19efe105b9515ab1926ebcaf0e028d71 providers:34502fe09dc0b8b0a13e7e46efdffda6 -variables:f8fc76d3d398b2780f4e97f7cd816646 +variables:68cf6c7b27960c35e5e96895053a349f version:31f4efdf8de0dbaaa4fac71ff7efecc3 plugins:4864fd8f356704bd2b3cd1aec3567e35 auth login:9fe2bb1dd5c602beea2eefb33a2b20a8 diff --git a/airflow-ctl/docs/images/output_variables.svg b/airflow-ctl/docs/images/output_variables.svg index a8833a923899d..91dfe2863342b 100644 --- a/airflow-ctl/docs/images/output_variables.svg +++ b/airflow-ctl/docs/images/output_variables.svg @@ -1,4 +1,4 @@ - + - - + + - + - + - + - + - + - + - + - + - + - + - + - + - + - + + + + + + + - + - + - - Usage:airflowctl variables [-hCOMMAND... - -Perform Variables operations - -Positional Arguments: -COMMAND -createCreate a new variable -deleteDelete a variable by its key -getRetrieve a variable by its key -importImport variables from a file exported with local CLI. -listList all variables -updateUpdate an existing variable - -Options: --h--helpshow this help message and exit + + Usage:airflowctl variables [-hCOMMAND... + +Perform Variables operations + +Positional Arguments: +COMMAND +createCreate a new variable +deleteDelete a variable by its key +getRetrieve a variable by its key +importImport variables from a file exported with local CLI. +listList all variables +setSet a variable, creating it if it does not exist and updating +it otherwise. +updateUpdate an existing variable + +Options: +-h--helpshow this help message and exit diff --git a/airflow-ctl/src/airflowctl/ctl/cli_config.py b/airflow-ctl/src/airflowctl/ctl/cli_config.py index 11ff4542e01ef..b26abb3fcb81d 100755 --- a/airflow-ctl/src/airflowctl/ctl/cli_config.py +++ b/airflow-ctl/src/airflowctl/ctl/cli_config.py @@ -276,6 +276,21 @@ def _load_help_texts_yaml() -> dict[str, dict[str, str]]: choices=("overwrite", "fail", "skip"), ) +# Variable command args +ARG_VAR_KEY = Arg(flags=("key",), type=str, help="Variable key") +ARG_VAR_VALUE = Arg(flags=("value",), metavar="VALUE", type=str, help="Variable value") +ARG_VAR_DESCRIPTION = Arg( + flags=("--description",), + type=str, + default=None, + help="Variable description, optional when setting a variable", +) +ARG_VAR_SERIALIZE_JSON = Arg( + flags=("-j", "--json"), + action="store_true", + help="Serialize JSON variable", +) + # Config arguments ARG_CONFIG_SECTION = Arg( flags=("--section",), @@ -1007,6 +1022,12 @@ def merge_commands( ) VARIABLE_COMMANDS = ( + ActionCommand( + name="set", + help="Set a variable, creating it if it does not exist and updating it otherwise.", + func=lazy_load_command("airflowctl.ctl.commands.variable_command.set_"), + args=(ARG_VAR_KEY, ARG_VAR_VALUE, ARG_VAR_DESCRIPTION, ARG_VAR_SERIALIZE_JSON), + ), ActionCommand( name="import", help="Import variables from a file exported with local CLI.", diff --git a/airflow-ctl/src/airflowctl/ctl/commands/variable_command.py b/airflow-ctl/src/airflowctl/ctl/commands/variable_command.py index 19321002e4442..fad05cb728791 100644 --- a/airflow-ctl/src/airflowctl/ctl/commands/variable_command.py +++ b/airflow-ctl/src/airflowctl/ctl/commands/variable_command.py @@ -36,6 +36,25 @@ def _print_file_error(message: str, file_path: str) -> None: Console().print(f"[red]{message}: {file_path}", soft_wrap=True) +@provide_api_client(kind=ClientKind.CLI) +def set_(args, api_client=NEW_API_CLIENT) -> None: + """Set a variable, creating it if it does not exist and updating it otherwise.""" + value = args.value + if args.json: + value = json.dumps(value) + bulk_body = BulkBodyVariableBody( + actions=[ + BulkCreateActionVariableBody( + action="create", + entities=[VariableBody(key=args.key, value=value, description=args.description)], + action_on_existence=BulkActionOnExistence.OVERWRITE, + ) + ] + ) + api_client.variables.bulk(variables=bulk_body) + rich.print(f"[green]Variable {args.key} set[/green]") + + @provide_api_client(kind=ClientKind.CLI) def import_(args, api_client=NEW_API_CLIENT) -> list[str]: """Import variables from a given file.""" diff --git a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_variable_command.py b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_variable_command.py index f573585935fd4..abdebf1bca2c8 100644 --- a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_variable_command.py +++ b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_variable_command.py @@ -63,6 +63,40 @@ class TestCliVariableCommands: delete=None, ) + def _captured_set_entity(self, argv): + """Run ``set`` against a capturing client and return the variable entity it sent.""" + captured: dict = {} + + def bulk(variables): + captured["body"] = variables + return self.bulk_response_success + + api_client = SimpleNamespace(variables=SimpleNamespace(bulk=bulk)) + variable_command.set_(self.parser.parse_args(argv), api_client=api_client) + + action = captured["body"].actions[0] + assert action.action == "create" + assert action.action_on_existence == "overwrite" + return action.entities[0] + + def test_set(self): + """Test variable_set command""" + entity = self._captured_set_entity(["variables", "set", "new_key", "new_value"]) + assert entity.key == "new_key" + assert entity.value.root == "new_value" + + def test_set_serialize_json(self): + """Test variable_set command with json argument""" + entity = self._captured_set_entity(["variables", "set", "json_key", '{"a": 1}', "--json"]) + assert entity.value.root == json.dumps('{"a": 1}') + + def test_set_forwards_description(self): + """Test variable_set command with optional description argument""" + entity = self._captured_set_entity( + ["variables", "set", "key", "value", "--description", "a description"] + ) + assert entity.description == "a description" + def test_import_success(self, api_client_maker, tmp_path, monkeypatch): api_client = api_client_maker( path="/api/v2/variables",