Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 34 additions & 37 deletions airflow-core/src/airflow/cli/commands/variable_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@

import json
import os
from typing import TYPE_CHECKING

from airflowctl.api.datamodels.generated import (
BulkActionOnExistence,
BulkBodyVariableBody,
BulkCreateActionVariableBody,
VariableBody,
)
from sqlalchemy import select

from airflow.cli.api_client import NEW_API_CLIENT, Client, provide_api_client
from airflow.cli.simple_table import AirflowConsole
from airflow.cli.utils import SENSITIVE_PLACEHOLDER, print_export_output
from airflow.cli.utils import SENSITIVE_PLACEHOLDER, deprecated_for_airflowctl, print_export_output
from airflow.exceptions import (
AirflowFileParseException,
AirflowUnsupportedFileTypeException,
Expand All @@ -37,10 +43,7 @@
from airflow.utils import cli as cli_utils
from airflow.utils.cli import suppress_logs_and_warning
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
from airflow.utils.session import NEW_SESSION, create_session, provide_session

if TYPE_CHECKING:
from sqlalchemy.orm.session import Session
from airflow.utils.session import create_session


class VariableDisplayMapper:
Expand Down Expand Up @@ -124,9 +127,11 @@ def variables_delete(args):


@cli_utils.action_cli
@deprecated_for_airflowctl("airflowctl variables import")
@suppress_logs_and_warning
@providers_configuration_loaded
@provide_session
def variables_import(args, *, session: Session = NEW_SESSION):
@provide_api_client
def variables_import(args, api_client: Client = NEW_API_CLIENT):
"""Import variables from a given file."""
if not os.path.exists(args.file):
raise SystemExit("Missing variables file.")
Expand All @@ -140,36 +145,28 @@ def variables_import(args, *, session: Session = NEW_SESSION):
except Exception as e:
raise SystemExit(f"Failed to load variables file: {e}")

suc_count = fail_count = 0
skipped = set()
action_on_existing = args.action_on_existing_key
existing_keys = set()
if action_on_existing != "overwrite":
existing_keys = set(session.scalars(select(Variable.key).where(Variable.key.in_(var_json))))
if action_on_existing == "fail" and existing_keys:
raise SystemExit(f"Failed. These keys: {sorted(existing_keys)} already exists.")
entities = []
for k, v in var_json.items():
if action_on_existing == "skip" and k in existing_keys:
skipped.add(k)
continue
try:
value = v
description = None
if isinstance(v, dict) and "value" in v: # verify that var configuration has value
value, description = v["value"], v.get("description")
Variable.set(k, value, description, serialize_json=not isinstance(value, str))
except Exception as e:
print(f"Variable import failed: {e!r}")
fail_count += 1
else:
suc_count += 1
print(f"{suc_count} of {len(var_json)} variables successfully updated.")
if fail_count:
print(f"{fail_count} variable(s) failed to be updated.")
if skipped:
print(
f"The variables with these keys: {list(sorted(skipped))} were skipped because they already exists"
)
value, description = v, None
if isinstance(v, dict) and "value" in v: # verify that var configuration has value
value, description = v["value"], v.get("description")
entities.append(VariableBody(key=k, value=value, description=description))

bulk_body = BulkBodyVariableBody(
actions=[
BulkCreateActionVariableBody(
action="create",
entities=entities,
action_on_existence=BulkActionOnExistence(args.action_on_existing_key),
)
]
)
result = api_client.variables.bulk(variables=bulk_body)
if result.create and result.create.errors:
raise SystemExit(f"Failed to import variables: {result.create.errors}")

success = result.create.success if result.create else []
print(f"{len(success)} of {len(var_json)} variables successfully updated.")


@providers_configuration_loaded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

import pytest

from airflow.cli.commands import asset_command, dag_command, pool_command
from airflow.cli.commands import asset_command, dag_command, pool_command, variable_command
from airflow.exceptions import RemovedInAirflow4Warning

# (command callable, argv to parse, expected airflowctl replacement named in the warning)
Expand All @@ -52,6 +52,11 @@
["assets", "materialize", "--name=foo"],
"airflowctl assets materialize",
),
(
variable_command.variables_import,
["variables", "import", "/nonexistent.json"],
"airflowctl variables import",
),
]


Expand Down
Loading
Loading