Skip to content

Commit

Permalink
fixes airflow provider init sequence (#569)
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix authored Aug 20, 2023
1 parent abad005 commit 887cbf9
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 8 deletions.
3 changes: 1 addition & 2 deletions dlt/common/configuration/providers/airflow.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from airflow.models import Variable

from .toml import VaultTomlProvider


Expand All @@ -13,6 +11,7 @@ def name(self) -> str:

def _look_vault(self, full_key: str, hint: type) -> str:
"""Get Airflow Variable with given `full_key`, return None if not found"""
from airflow.models import Variable
return Variable.get(full_key, default_var=None) # type: ignore

@property
Expand Down
20 changes: 14 additions & 6 deletions dlt/common/configuration/specs/config_providers_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,28 +105,36 @@ def _airflow_providers() -> List[ConfigProvider]:
task context will not be available. Still we want the provider to function so
we just test if Airflow can be imported.
"""
if not is_airflow_installed():
return []

providers: List[ConfigProvider] = []

try:
# hide stdio. airflow typically dumps tons of warnings and deprecations to stdout and stderr
with contextlib.redirect_stdout(io.StringIO()), contextlib.redirect_stderr(io.StringIO()):
from airflow.operators.python import get_current_context # noqa

# try to get dlt secrets variable. many broken Airflow installations break here. in that case do not create
from airflow.models import Variable # noqa
from dlt.common.configuration.providers.airflow import AirflowSecretsTomlProvider
# probe if Airflow variable containing all secrets is present
from dlt.common.configuration.providers.toml import SECRETS_TOML_KEY
secrets_toml_var = Variable.get(SECRETS_TOML_KEY, default_var=None)

# providers can be returned - mind that AirflowSecretsTomlProvider() requests the variable above immediately
providers = [AirflowSecretsTomlProvider()]

# check if we are in task context and provide more info
from airflow.operators.python import get_current_context # noqa
ti = get_current_context()["ti"]

# log outside of stderr/out redirect
if secrets_toml_var is None:
message = f"Airflow variable '{SECRETS_TOML_KEY}' was not found. " + \
"This Airflow variable is a recommended place to hold the content of secrets.toml." + \
"If you do not use Airflow variables to hold dlt configuration or use variables with other names you can ignore this warning."
ti = get_current_context()["ti"]
ti.log.warning(message)

except Exception:
# do not probe variables when not in task context
pass

return [AirflowSecretsTomlProvider()]
# airflow not detected
return providers
4 changes: 4 additions & 0 deletions tests/helpers/airflow_tests/test_airflow_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ def test_dag():
from dlt.common.configuration.providers.airflow import AirflowSecretsTomlProvider

Variable.set(SECRETS_TOML_KEY, SECRETS_TOML_CONTENT)
# make sure provider works while creating DAG
provider = AirflowSecretsTomlProvider()
assert provider.get_value("api_key", str, None, "sources")[0] == "test_value"

@task()
def test_task():
Expand Down Expand Up @@ -78,6 +81,7 @@ def test_dag():

# this will initialize provider context
api_key = secrets["sources.api_key"]
assert api_key == "test_value"

@task()
def test_task():
Expand Down

0 comments on commit 887cbf9

Please sign in to comment.