Skip to content

Commit

Permalink
Enable Docs DAG in CI leveraging existing CI connections (#1428)
Browse files Browse the repository at this point in the history
It seems that the dbt docs DAG in our example DAGs suite was silently
failing to run because the specified connections in the DAG were not
present in our CI environment. I am removing the connection checks,
allowing the previously skipped tasks to execute even when the remote
Airflow connections are unavailable. Additionally, I’ve updated the DAG
to use our existing CI credentials and Airflow connections, as well as
updated the bucket name to the ones I created in our object stores. This
change will help catch issues like #1420 earlier. While fix #1422 has
already addressed #1420, this PR will now validate that the fix is
functioning as expected.

closes: #1420
  • Loading branch information
pankajkoti authored Dec 27, 2024
1 parent 344913f commit 050ecd4
Showing 1 changed file with 9 additions and 40 deletions.
49 changes: 9 additions & 40 deletions dev/dags/dbt_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
from pathlib import Path

from airflow import DAG
from airflow.decorators import task
from airflow.exceptions import AirflowNotFoundException
from airflow.hooks.base import BaseHook
from pendulum import datetime

from cosmos import ProfileConfig
Expand All @@ -27,9 +24,6 @@
DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))

S3_CONN_ID = "aws_docs"
AZURE_CONN_ID = "azure_docs"
GCS_CONN_ID = "gcs_docs"

profile_config = ProfileConfig(
profile_name="default",
Expand All @@ -41,32 +35,6 @@
)


@task.branch(task_id="which_upload")
def which_upload():
"""Only run the docs tasks if we have the proper connections set up"""
downstream_tasks_to_run = []

try:
BaseHook.get_connection(S3_CONN_ID)
downstream_tasks_to_run += ["generate_dbt_docs_aws"]
except AirflowNotFoundException:
pass

# if we have an AZURE_CONN_ID, check if it's valid
try:
BaseHook.get_connection(AZURE_CONN_ID)
downstream_tasks_to_run += ["generate_dbt_docs_azure"]
except AirflowNotFoundException:
pass
try:
BaseHook.get_connection(GCS_CONN_ID)
downstream_tasks_to_run += ["generate_dbt_docs_gcs"]
except AirflowNotFoundException:
pass

return downstream_tasks_to_run


with DAG(
dag_id="docs_dag",
start_date=datetime(2023, 1, 1),
Expand All @@ -79,24 +47,25 @@ def which_upload():
task_id="generate_dbt_docs_aws",
project_dir=DBT_ROOT_PATH / "jaffle_shop",
profile_config=profile_config,
connection_id=S3_CONN_ID,
bucket_name="cosmos-docs",
connection_id="aws_s3_conn",
bucket_name="cosmos-ci-docs",
install_deps=True,
)

generate_dbt_docs_azure = DbtDocsAzureStorageOperator(
task_id="generate_dbt_docs_azure",
project_dir=DBT_ROOT_PATH / "jaffle_shop",
profile_config=profile_config,
connection_id=AZURE_CONN_ID,
bucket_name="$web",
connection_id="azure_abfs_conn",
bucket_name="cosmos-ci-docs",
install_deps=True,
)

generate_dbt_docs_gcs = DbtDocsGCSOperator(
task_id="generate_dbt_docs_gcs",
project_dir=DBT_ROOT_PATH / "jaffle_shop",
profile_config=profile_config,
connection_id=GCS_CONN_ID,
bucket_name="cosmos-docs",
connection_id="gcp_gs_conn",
bucket_name="cosmos-ci-docs",
install_deps=True,
)

which_upload() >> [generate_dbt_docs_aws, generate_dbt_docs_azure, generate_dbt_docs_gcs]

0 comments on commit 050ecd4

Please sign in to comment.