-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[dagster-airlift] remove sections in reference (#25814)
## Summary & Motivation We moved some of the sections from the tutorial to the reference. Let's remove them.
- Loading branch information
Showing
23 changed files
with
1,628 additions
and
2,571 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,51 @@ | ||
This is the main page. | ||
# Airlift | ||
|
||
Airlift is a toolkit for integrating Dagster and Airflow. | ||
|
||
- Observe Airflow instances from within Dagster | ||
- Accelerate the migration of Airflow DAGs to Dagster assets with opinionated tooling. | ||
|
||
--- | ||
|
||
## Compatibility | ||
|
||
### REST API Availability | ||
|
||
Airlift depends on the availability of Airflow’s REST API. Airflow’s REST API was made stable in its 2.0 release (Dec 2020) and was introduced experimentally in 1.10 in August 2018. Currently Airflow requires the availability of the REST API. | ||
|
||
- **OSS:** Stable as of 2.00 | ||
- **MWAA** | ||
- Note: only available in Airflow 2.4.3 or later on MWAA. | ||
- **Cloud Composer:** No limitations as far as we know. | ||
- **Astronomer:** No limitations as far as we know. | ||
|
||
--- | ||
|
||
## Airflow Migration Tutorial | ||
|
||
In this tutorial, we'll use `dagster-airlift` to migrate an Airflow DAG to Dagster assets. | ||
|
||
By the end of the tutorial, you'll understand how to use `dagster-airlift` to enable a migration process that | ||
|
||
- Can be done task-by-task in any order with minimal coordination | ||
- Has task-by-task rollback to reduce risk | ||
- Retains Airflow DAG structure and execution history during the migration | ||
|
||
[Click here to get started](/integrations/airlift/tutorial/overview). | ||
|
||
## References | ||
|
||
<ArticleList> | ||
<ArticleListItem | ||
title="Airlift API reference" | ||
href="/\_apidocs/libraries/dagster-airlift" | ||
></ArticleListItem> | ||
<ArticleListItem | ||
title="Migrating an Entire DAG At Once" | ||
href="/integrations/airlift/full_dag" | ||
></ArticleListItem> | ||
<ArticleListItem | ||
title="Additional Airlift Functionality" | ||
href="/integrations/airlift/reference" | ||
></ArticleListItem> | ||
</ArticleList> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,243 @@ | ||
# Migrating at the DAG-level. | ||
|
||
There may be DAGs for which you want to migrate the entire thing at once rather than on a per-task basis. Some reasons for taking this approach: | ||
|
||
- You're making use of "dynamic tasks" in Airflow, which don't conform neatly to the task mapping protocol we've laid out above. | ||
- You want to make more substantial refactors to the dag structure that don't conform to the existing task structure | ||
|
||
For cases like this, we allow you to map assets to a full DAG. | ||
|
||
## Setup | ||
|
||
This guide utilizes the [Airflow Migration Tutorial](/integrations/airlift/tutorial/overview), and assumes you've completed the initial [setup](/integrations/airlift/tutorial/setup) and [peer](/integrations/airlift/tutorial/peer) stages. This guide will pick up from there.If you've already completed the migration tutorial, we advise downloading a fresh copy and following along with those steps. This guide will perform the observe and migrate steps at the DAG-level instead of on a task-by-task basis, for the `rebuild_customers_list` DAG. | ||
|
||
## Observing DAG-mapped | ||
|
||
When migrating an entire DAG at once, we'll want to create assets which map to the entire DAG. Whereas in the [task-by-task observation step](/integrations/airlift/tutorial/setup), we used the `assets_with_task_mappings` function, we'll instead use the `assets_with_dag_mappings` function. | ||
|
||
For our `rebuild_customers_list` DAG, let's take a look at what the new observation code looks like: | ||
|
||
```python file=../../experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/observe_dag_level.py | ||
import os | ||
from pathlib import Path | ||
|
||
from dagster import AssetExecutionContext, AssetSpec, Definitions | ||
from dagster_airlift.core import ( | ||
AirflowBasicAuthBackend, | ||
AirflowInstance, | ||
assets_with_dag_mappings, | ||
build_defs_from_airflow_instance, | ||
) | ||
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets | ||
|
||
|
||
def dbt_project_path() -> Path: | ||
env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR") | ||
assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set" | ||
return Path(env_val) | ||
|
||
|
||
@dbt_assets( | ||
manifest=dbt_project_path() / "target" / "manifest.json", | ||
project=DbtProject(dbt_project_path()), | ||
) | ||
def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource): | ||
yield from dbt.cli(["build"], context=context).stream() | ||
|
||
|
||
# Instead of mapping assets to individual tasks, we map them to the entire DAG. | ||
mapped_assets = assets_with_dag_mappings( | ||
dag_mappings={ | ||
"rebuild_customers_list": [ | ||
AssetSpec(key=["raw_data", "raw_customers"]), | ||
dbt_project_assets, | ||
AssetSpec(key="customers_csv", deps=["customers"]), | ||
], | ||
}, | ||
) | ||
|
||
|
||
defs = build_defs_from_airflow_instance( | ||
airflow_instance=AirflowInstance( | ||
auth_backend=AirflowBasicAuthBackend( | ||
webserver_url="http://localhost:8080", | ||
username="admin", | ||
password="admin", | ||
), | ||
name="airflow_instance_one", | ||
), | ||
defs=Definitions( | ||
assets=mapped_assets, | ||
resources={"dbt": DbtCliResource(project_dir=dbt_project_path())}, | ||
), | ||
) | ||
``` | ||
|
||
Now, instead of getting a materialization when a particular task completes, each mapped asset will receive a materialization when the entire DAG completes. | ||
|
||
## Migrating DAG-mapped assets | ||
|
||
Recall that in the [task-by-task migration step](/integrations/airlift/tutorial/migrate), we "proxy" execution on a task by task basis, which is controlled by a yaml document. For DAG-mapped assets, execution is proxied on a per-DAG basis. Proxying execution to Dagster will require all assets mapped to that DAG be _executable_ within Dagster. Let's take a look at some fully migrated code mapped to DAGs instead of tasks: | ||
|
||
```python file=../../experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/migrate_dag_level.py | ||
import os | ||
from pathlib import Path | ||
|
||
from dagster import ( | ||
AssetExecutionContext, | ||
AssetsDefinition, | ||
AssetSpec, | ||
Definitions, | ||
materialize, | ||
multi_asset, | ||
) | ||
from dagster_airlift.core import ( | ||
AirflowBasicAuthBackend, | ||
AirflowInstance, | ||
assets_with_dag_mappings, | ||
build_defs_from_airflow_instance, | ||
) | ||
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets | ||
|
||
# Code also invoked from Airflow | ||
from tutorial_example.shared.export_duckdb_to_csv import ExportDuckDbToCsvArgs, export_duckdb_to_csv | ||
from tutorial_example.shared.load_csv_to_duckdb import LoadCsvToDuckDbArgs, load_csv_to_duckdb | ||
|
||
|
||
def dbt_project_path() -> Path: | ||
env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR") | ||
assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set" | ||
return Path(env_val) | ||
|
||
|
||
def airflow_dags_path() -> Path: | ||
return Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "tutorial_example" / "airflow_dags" | ||
|
||
|
||
def load_csv_to_duckdb_asset(spec: AssetSpec, args: LoadCsvToDuckDbArgs) -> AssetsDefinition: | ||
@multi_asset(name=f"load_{args.table_name}", specs=[spec]) | ||
def _multi_asset() -> None: | ||
load_csv_to_duckdb(args) | ||
|
||
return _multi_asset | ||
|
||
|
||
def export_duckdb_to_csv_defs(spec: AssetSpec, args: ExportDuckDbToCsvArgs) -> AssetsDefinition: | ||
@multi_asset(name=f"export_{args.table_name}", specs=[spec]) | ||
def _multi_asset() -> None: | ||
export_duckdb_to_csv(args) | ||
|
||
return _multi_asset | ||
|
||
|
||
@dbt_assets( | ||
manifest=dbt_project_path() / "target" / "manifest.json", | ||
project=DbtProject(dbt_project_path()), | ||
) | ||
def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource): | ||
yield from dbt.cli(["build"], context=context).stream() | ||
|
||
|
||
mapped_assets = assets_with_dag_mappings( | ||
dag_mappings={ | ||
"rebuild_customers_list": [ | ||
load_csv_to_duckdb_asset( | ||
AssetSpec(key=["raw_data", "raw_customers"]), | ||
LoadCsvToDuckDbArgs( | ||
table_name="raw_customers", | ||
csv_path=airflow_dags_path() / "raw_customers.csv", | ||
duckdb_path=Path(os.environ["AIRFLOW_HOME"]) / "jaffle_shop.duckdb", | ||
names=["id", "first_name", "last_name"], | ||
duckdb_schema="raw_data", | ||
duckdb_database_name="jaffle_shop", | ||
), | ||
), | ||
dbt_project_assets, | ||
export_duckdb_to_csv_defs( | ||
AssetSpec(key="customers_csv", deps=["customers"]), | ||
ExportDuckDbToCsvArgs( | ||
table_name="customers", | ||
csv_path=Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv", | ||
duckdb_path=Path(os.environ["AIRFLOW_HOME"]) / "jaffle_shop.duckdb", | ||
duckdb_database_name="jaffle_shop", | ||
), | ||
), | ||
], | ||
}, | ||
) | ||
|
||
|
||
defs = build_defs_from_airflow_instance( | ||
airflow_instance=AirflowInstance( | ||
auth_backend=AirflowBasicAuthBackend( | ||
webserver_url="http://localhost:8080", | ||
username="admin", | ||
password="admin", | ||
), | ||
name="airflow_instance_one", | ||
), | ||
defs=Definitions( | ||
assets=mapped_assets, | ||
resources={"dbt": DbtCliResource(project_dir=dbt_project_path())}, | ||
), | ||
) | ||
|
||
|
||
if __name__ == "__main__": | ||
assert dbt_project_path().exists() | ||
# print(dbt_project_path().absolute()) | ||
Definitions.validate_loadable(defs) | ||
materialize(defs.get_asset_graph().assets_defs) | ||
``` | ||
|
||
Now that all of our assets are fully executable, we can create a simple yaml file to proxy execution for the whole dag: | ||
|
||
```yaml file=../../experimental/dagster-airlift/examples/tutorial-example/tutorial_example/snippets/rebuild_customers_list.yaml | ||
proxied: True | ||
``` | ||
We will similarly use `proxying_to_dagster` at the end of our DAG file (the code is exactly the same here as it was for the per-task migration step) | ||
|
||
```python file=../../experimental/dagster-airlift/examples/tutorial-example/tutorial_example/snippets/dags_truncated.py | ||
# Dags file can be found at tutorial_example/airflow_dags/dags.py | ||
from pathlib import Path | ||
from airflow import DAG | ||
from dagster_airlift.in_airflow import proxying_to_dagster | ||
from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml | ||
dag = DAG("rebuild_customers_list", ...) | ||
... | ||
# Set this to True to begin the proxying process | ||
PROXYING = False | ||
if PROXYING: | ||
proxying_to_dagster( | ||
global_vars=globals(), | ||
proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"), | ||
) | ||
``` | ||
|
||
Once the `proxied` bit is flipped to True, we can go to the Airflow UI, and we'll see that our tasks have been replaced with a single task. | ||
|
||
<p align="center"> | ||
|
||
<Image | ||
alt="Before DAG proxying" | ||
src="/images/integrations/airlift/before_dag_override.png" | ||
width={1484} | ||
height={300} | ||
/> <Image | ||
alt="After DAG proxying" | ||
src="/images/integrations/airlift/after_dag_override.png" | ||
width={576} | ||
height={274} | ||
/> | ||
|
||
</p> | ||
|
||
When performing dag-level mapping, we don't preserve task structure in the Airflow dags. This single task will materialize all mapped Dagster assets instead of executing the original Airflow task business logic. | ||
|
||
We can similarly mark `proxied` back to `False`, and the original task structure and business logic will return unchanged. |
Oops, something went wrong.
35a634f
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deploy preview for dagster-docs ready!
✅ Preview
https://dagster-docs-1n0qeltpc-elementl.vercel.app
https://master.dagster.dagster-docs.io
Built with commit 35a634f.
This pull request is being automatically deployed with vercel-action