From 050ecd46a8a78d67311cd042082d20fdf966210b Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 27 Dec 2024 22:14:21 +0530 Subject: [PATCH] Enable Docs DAG in CI leveraging existing CI connections (#1428) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit It seems that the dbt docs DAG in our example DAGs suite was silently failing to run because the specified connections in the DAG were not present in our CI environment. I am removing the connection checks, allowing the previously skipped tasks to execute even when the remote Airflow connections are unavailable. Additionally, I’ve updated the DAG to use our existing CI credentials and Airflow connections, as well as updated the bucket name to the ones I created in our object stores. This change will help catch issues like #1420 earlier. While fix #1422 has already addressed #1420, this PR will now validate that the fix is functioning as expected. closes: #1420 --- dev/dags/dbt_docs.py | 49 ++++++++------------------------------------ 1 file changed, 9 insertions(+), 40 deletions(-) diff --git a/dev/dags/dbt_docs.py b/dev/dags/dbt_docs.py index e26e10d53..1e265cc88 100644 --- a/dev/dags/dbt_docs.py +++ b/dev/dags/dbt_docs.py @@ -11,9 +11,6 @@ from pathlib import Path from airflow import DAG -from airflow.decorators import task -from airflow.exceptions import AirflowNotFoundException -from airflow.hooks.base import BaseHook from pendulum import datetime from cosmos import ProfileConfig @@ -27,9 +24,6 @@ DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) -S3_CONN_ID = "aws_docs" -AZURE_CONN_ID = "azure_docs" -GCS_CONN_ID = "gcs_docs" profile_config = ProfileConfig( profile_name="default", @@ -41,32 +35,6 @@ ) -@task.branch(task_id="which_upload") -def which_upload(): - """Only run the docs tasks if we have the proper connections set up""" - downstream_tasks_to_run = [] - - try: - BaseHook.get_connection(S3_CONN_ID) - downstream_tasks_to_run += ["generate_dbt_docs_aws"] - except AirflowNotFoundException: - pass - - # if we have an AZURE_CONN_ID, check if it's valid - try: - BaseHook.get_connection(AZURE_CONN_ID) - downstream_tasks_to_run += ["generate_dbt_docs_azure"] - except AirflowNotFoundException: - pass - try: - BaseHook.get_connection(GCS_CONN_ID) - downstream_tasks_to_run += ["generate_dbt_docs_gcs"] - except AirflowNotFoundException: - pass - - return downstream_tasks_to_run - - with DAG( dag_id="docs_dag", start_date=datetime(2023, 1, 1), @@ -79,24 +47,25 @@ def which_upload(): task_id="generate_dbt_docs_aws", project_dir=DBT_ROOT_PATH / "jaffle_shop", profile_config=profile_config, - connection_id=S3_CONN_ID, - bucket_name="cosmos-docs", + connection_id="aws_s3_conn", + bucket_name="cosmos-ci-docs", + install_deps=True, ) generate_dbt_docs_azure = DbtDocsAzureStorageOperator( task_id="generate_dbt_docs_azure", project_dir=DBT_ROOT_PATH / "jaffle_shop", profile_config=profile_config, - connection_id=AZURE_CONN_ID, - bucket_name="$web", + connection_id="azure_abfs_conn", + bucket_name="cosmos-ci-docs", + install_deps=True, ) generate_dbt_docs_gcs = DbtDocsGCSOperator( task_id="generate_dbt_docs_gcs", project_dir=DBT_ROOT_PATH / "jaffle_shop", profile_config=profile_config, - connection_id=GCS_CONN_ID, - bucket_name="cosmos-docs", + connection_id="gcp_gs_conn", + bucket_name="cosmos-ci-docs", + install_deps=True, ) - - which_upload() >> [generate_dbt_docs_aws, generate_dbt_docs_azure, generate_dbt_docs_gcs]