Skip to content

Commit

Permalink
chore(airflow): Post 0.19 cleanup (#478)
Browse files Browse the repository at this point in the history
* bump version

Signed-off-by: Ankita Katiyar <[email protected]>

* Unbump version and clean test

Signed-off-by: Ankita Katiyar <[email protected]>

* Update e2e tests

Signed-off-by: Ankita Katiyar <[email protected]>

* Update e2e tests

Signed-off-by: Ankita Katiyar <[email protected]>

* Update e2e tests

Signed-off-by: Ankita Katiyar <[email protected]>

* Update e2e tests

Signed-off-by: Ankita Katiyar <[email protected]>

* Split big test into smaller tests

Signed-off-by: Ankita Katiyar <[email protected]>

* Update conftest

Signed-off-by: Ankita Katiyar <[email protected]>

* Update conftest

Signed-off-by: Ankita Katiyar <[email protected]>

* Fix coverage

Signed-off-by: Ankita Katiyar <[email protected]>

* Try unpin airflow

Signed-off-by: Ankita Katiyar <[email protected]>

* remove datacatalog step

Signed-off-by: Ankita Katiyar <[email protected]>

* Change node

Signed-off-by: Ankita Katiyar <[email protected]>

* update tasks test step

Signed-off-by: Ankita Katiyar <[email protected]>

* Revert to older airflow and constraint pendulum

Signed-off-by: Ankita Katiyar <[email protected]>

* Update template

Signed-off-by: Ankita Katiyar <[email protected]>

* Update message in e2e step

Signed-off-by: Ankita Katiyar <[email protected]>

* Final cleanup

Signed-off-by: Ankita Katiyar <[email protected]>

* Update kedro-airflow/pyproject.toml

Signed-off-by: Nok Lam Chan <[email protected]>

* Pin apache-airflow again

Signed-off-by: Ankita Katiyar <[email protected]>

---------

Signed-off-by: Ankita Katiyar <[email protected]>
Signed-off-by: Nok Lam Chan <[email protected]>
Co-authored-by: Nok Lam Chan <[email protected]>
  • Loading branch information
ankatiyar and noklam authored Dec 20, 2023
1 parent 124c6c9 commit cafa0f9
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 108 deletions.
12 changes: 5 additions & 7 deletions kedro-airflow/features/airflow.feature
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,19 @@ Feature: Airflow
Given I have installed kedro version "latest"
And I have prepared a config file
And I have run a non-interactive kedro new
And I have prepared a data catalog
And I have executed the kedro command "airflow create -t ../airflow/dags/"
When I execute the airflow command "tasks list project-dummy"
Then I should get a successful exit code
And I should get a message including "split"
And I should get a message including "make-predictions"
And I should get a message including "report-accuracy"
And I should get a message including "create-model-input-table-node"
And I should get a message including "preprocess-companies-node"
And I should get a message including "preprocess-shuttles-node"

Scenario: Run Airflow task locally with latest Kedro
Given I have installed kedro version "latest"
And I have prepared a config file
And I have run a non-interactive kedro new
And I have prepared a data catalog
And I have executed the kedro command "airflow create -t ../airflow/dags/"
And I have installed the kedro project package
When I execute the airflow command "tasks test project-dummy split 2016-06-01T00:00:00+00:00"
When I execute the airflow command "tasks test project-dummy preprocess-companies-node"
Then I should get a successful exit code
And I should get a message including "Loading data from 'parameters'"
And I should get a message including "Loading data from companies"
70 changes: 2 additions & 68 deletions kedro-airflow/features/steps/cli_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,72 +16,6 @@ def init_airflow(context, home_dir):
assert res.returncode == 0


@given("I have prepared an old data catalog")
def prepare_old_catalog(context):
config = {
"example_train_x": {
"type": "PickleLocalDataset",
"filepath": "data/02_intermediate/example_train_x.pkl",
},
"example_train_y": {
"type": "PickleLocalDataset",
"filepath": "data/02_intermediate/example_train_y.pkl",
},
"example_test_x": {
"type": "PickleLocalDataset",
"filepath": "data/02_intermediate/example_test_x.pkl",
},
"example_test_y": {
"type": "PickleLocalDataset",
"filepath": "data/02_intermediate/example_test_y.pkl",
},
"example_model": {
"type": "PickleLocalDataset",
"filepath": "data/02_intermediate/example_model.pkl",
},
"example_predictions": {
"type": "PickleLocalDataset",
"filepath": "data/02_intermediate/example_predictions.pkl",
},
}
catalog_file = context.root_project_dir / "conf" / "local" / "catalog.yml"
with catalog_file.open("w") as catalog_file:
yaml.dump(config, catalog_file, default_flow_style=False)


@given("I have prepared a data catalog")
def prepare_catalog(context):
config = {
"example_train_x": {
"type": "pickle.PickleDataset",
"filepath": "data/02_intermediate/example_train_x.pkl",
},
"example_train_y": {
"type": "pickle.PickleDataset",
"filepath": "data/02_intermediate/example_train_y.pkl",
},
"example_test_x": {
"type": "pickle.PickleDataset",
"filepath": "data/02_intermediate/example_test_x.pkl",
},
"example_test_y": {
"type": "pickle.PickleDataset",
"filepath": "data/02_intermediate/example_test_y.pkl",
},
"example_model": {
"type": "pickle.PickleDataset",
"filepath": "data/02_intermediate/example_model.pkl",
},
"example_predictions": {
"type": "pickle.PickleDataset",
"filepath": "data/02_intermediate/example_predictions.pkl",
},
}
catalog_file = context.root_project_dir / "conf" / "local" / "catalog.yml"
with catalog_file.open("w") as catalog_file:
yaml.dump(config, catalog_file, default_flow_style=False)


@given('I have installed kedro version "{version}"')
def install_kedro(context, version):
"""Execute Kedro command and check the status."""
Expand All @@ -100,7 +34,7 @@ def install_kedro(context, version):
@given("I have installed the kedro project package")
def install_project_package(context):
"""Install the packaged project."""
cmd = [context.pip, "install", "-e", "src/"]
cmd = [context.pip, "install", "-e", "."]
res = run(cmd, env=context.env, cwd=str(context.root_project_dir))

if res.returncode != OK_EXIT_CODE:
Expand Down Expand Up @@ -159,7 +93,7 @@ def create_project_from_config_file(context):
"-c",
str(context.config_file),
"--starter",
"pandas-iris",
"spaceflights-pandas",
],
env=context.env,
cwd=str(context.temp_dir),
Expand Down
3 changes: 1 addition & 2 deletions kedro-airflow/kedro_airflow/airflow_dag_template.j2
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ class KedroOperator(BaseOperator):

def execute(self, context):
configure_project(self.package_name)
with KedroSession.create(self.package_name,
self.project_path,
with KedroSession.create(project_path=self.project_path,
env=self.env) as session:
session.run(self.pipeline_name, node_names=[self.node_name])

Expand Down
2 changes: 1 addition & 1 deletion kedro-airflow/kedro_airflow/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def _load_config(context: KedroContext) -> dict[str, Any]:
# Backwards compatibility for ConfigLoader that does not support `config_patterns`
config_loader = context.config_loader
if not hasattr(config_loader, "config_patterns"):
return config_loader.get("airflow*", "airflow/**")
return config_loader.get("airflow*", "airflow/**") # pragma: no cover

# Set the default pattern for `airflow` if not provided in `settings.py`
if "airflow" not in config_loader.config_patterns.keys():
Expand Down
3 changes: 2 additions & 1 deletion kedro-airflow/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ Tracker = "https://github.com/kedro-org/kedro-plugins/issues"

[project.optional-dependencies]
test = [
"apache-airflow<2.7.0", # TODO: Temporary fix, make kedro-airflow compatible with new version of airflow
"apache-airflow<2.7.0", # TODO: Temporary fix, to be reverted
"bandit",
"behave",
"black~=22.0",
"connexion<3.0.0", # TODO: Temporary fix, connexion has changed their API, but airflow hasn't caught up yet
"kedro-datasets",
"pendulum<3.0.0", # TODO: Also to be removed
"pre-commit>=2.9.2",
"pytest",
"pytest-cov",
Expand Down
10 changes: 4 additions & 6 deletions kedro-airflow/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ def cli_runner():

def _create_kedro_settings_py(file_name: Path, patterns: list[str]):
patterns = ", ".join([f'"{p}"' for p in patterns])
content = f"""from kedro.config import OmegaConfigLoader
CONFIG_LOADER_CLASS = OmegaConfigLoader
CONFIG_LOADER_ARGS = {{
content = f"""CONFIG_LOADER_ARGS = {{
"base_env": "base",
"default_run_env": "local",
"config_patterns": {{
"airflow": [{patterns}], # configure the pattern for configuration files
}}
Expand All @@ -53,14 +53,12 @@ def kedro_project(cli_runner):
"python_package": "fake_project",
"include_example": True,
}

cookiecutter(
str(TEMPLATE_PATH),
output_dir=config["output_dir"],
no_input=True,
extra_context=config,
)

pipeline_registry_py = """
from kedro.pipeline import Pipeline, node
Expand Down Expand Up @@ -106,7 +104,7 @@ def metadata(kedro_project):
"hello_world",
"Hello world !!!",
project_path,
kedro_version,
project_path / "src",
kedro_version,
"none",
)
70 changes: 47 additions & 23 deletions kedro-airflow/tests/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,8 @@

import pytest
import yaml
from kedro.config import ConfigLoader
from kedro.framework.context import KedroContext
from pluggy import PluginManager

from kedro_airflow.plugin import _load_config, commands
from kedro_airflow.plugin import commands


@pytest.mark.parametrize(
Expand Down Expand Up @@ -69,6 +66,15 @@ def test_airflow_config_params(cli_runner, metadata):
assert dag_file.read_text() == default_content
dag_file.unlink()


def test_airflow_config_params_cli(cli_runner, metadata):
"""Check if config variables are picked up"""
dag_name = "hello_world"
template_name = "airflow_params.j2"
content = "{{ owner | default('hello')}}"

_create_kedro_airflow_jinja_template(Path.cwd(), template_name, content)

# "--params"
expected_content = "testme"
command = ["airflow", "create", "--params", "owner=testme", "-j", template_name]
Expand All @@ -80,6 +86,15 @@ def test_airflow_config_params(cli_runner, metadata):
assert dag_file.read_text() == expected_content
dag_file.unlink()


def test_airflow_config_params_from_config(cli_runner, metadata):
"""Check if config variables are picked up"""
dag_name = "hello_world"
template_name = "airflow_params.j2"
content = "{{ owner | default('hello')}}"

_create_kedro_airflow_jinja_template(Path.cwd(), template_name, content)

# airflow.yml
expected_content = "someone else"
file_name = Path.cwd() / "conf" / "base" / "airflow.yml"
Expand Down Expand Up @@ -107,6 +122,16 @@ def test_airflow_config_params(cli_runner, metadata):
assert dag_file.read_text() == expected_content
file_name.unlink()


def test_airflow_config_params_from_config_non_default(cli_runner, metadata):
"""Check if config variables are picked up"""
dag_name = "hello_world"
template_name = "airflow_params.j2"
content = "{{ owner | default('hello')}}"
default_content = "hello"

_create_kedro_airflow_jinja_template(Path.cwd(), template_name, content)

# random.yml
expected_content = "yet someone else again"
file_name = Path.cwd() / "conf" / "base" / "random.yml"
Expand All @@ -132,6 +157,15 @@ def test_airflow_config_params(cli_runner, metadata):
dag_file.unlink()
file_name.unlink()


def test_airflow_config_params_env(cli_runner, metadata):
"""Check if config variables are picked up"""
dag_name = "hello_world"
template_name = "airflow_params.j2"
content = "{{ owner | default('hello')}}"

_create_kedro_airflow_jinja_template(Path.cwd(), template_name, content)

# env
expected_content = "again someone else"
file_name = Path.cwd() / "conf" / "local" / "airflow.yml"
Expand All @@ -145,6 +179,15 @@ def test_airflow_config_params(cli_runner, metadata):
assert dag_file.read_text() == expected_content
dag_file.unlink()


def test_airflow_config_params_custom_pipeline(cli_runner, metadata):
"""Check if config variables are picked up"""
dag_name = "hello_world"
template_name = "airflow_params.j2"
content = "{{ owner | default('hello')}}"

_create_kedro_airflow_jinja_template(Path.cwd(), template_name, content)

# custom pipeline name
expected_content = "finally someone else"
file_name = Path.cwd() / "conf" / "base" / "airflow.yml"
Expand Down Expand Up @@ -267,22 +310,3 @@ def test_create_airflow_all_and_pipeline(cli_runner, metadata):
"Error: Invalid value: The `--all` and `--pipeline` option are mutually exclusive."
in result.stdout
)


def test_config_loader_backwards_compatibility(cli_runner, metadata):
# Emulate ConfigLoader in kedro <= 0.18.2
conf_source = Path.cwd() / "conf"
config_loader = ConfigLoader(conf_source=conf_source)
del config_loader.config_patterns
context = KedroContext(
config_loader=config_loader,
hook_manager=PluginManager(project_name=metadata.project_name),
package_name=metadata.package_name,
project_path=metadata.project_path,
)

config = _load_config(context)
assert config == {
"default": {"owner": "again someone else"},
"ds": {"owner": "finally someone else"},
}

0 comments on commit cafa0f9

Please sign in to comment.