Skip to content

Commit 99fd154

Browse files
committed
airflow sample: add explicit edge between preprocess and cleanup
1 parent d02f828 commit 99fd154

File tree

3 files changed

+104
-3
lines changed

3 files changed

+104
-3
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
digraph DataPipeline {
2+
rankdir=TB; // Layout from Top to Bottom
3+
compound=true;
4+
5+
// Global node styles for consistent appearance
6+
node [shape=box, style=filled, fontname="Helvetica", fillcolor="#D8EBF7"]; // Single global fillcolor
7+
8+
// Nodes for platforms
9+
node [shape=cylinder, fillcolor="#C1E1C1", style=filled]; // Distinct style for platform nodes
10+
ApacheAirflow [label="Apache Airflow\n(Orchestration)"];
11+
GCS [label="Google Cloud Storage"];
12+
BigQuery [label="BigQuery\n(Data Processing and Storage)"];
13+
14+
// Restore default node style for operators
15+
node [shape=box, fillcolor="#D8EBF7"];
16+
17+
// Data Pipeline Operations (Single BigFrames Operator Approach)
18+
subgraph cluster_single_operator_approach {
19+
label="Data Pipeline Operations"; // Label for the single approach
20+
style=filled;
21+
fillcolor="#F5F5F5"; // Light grey background for this subgraph
22+
23+
// Common Data Ingestion Step (orchestrated by Airflow, moves data to GCS)
24+
download_upload [label="Download CSV from HTTPS\n& Upload to GCS\n(BashOperator)"];
25+
26+
// Single BigFrames operator for preprocessing, validation, and writing
27+
bf_to_gbq [label="Process, Validate & Write Data\n(PythonOperator using BigFrames)\n(Reads from GCS, processes in BigQuery,\nwrites to final BigQuery table)", height=1.5];
28+
}
29+
30+
// Connect Airflow to the operators it orchestrates
31+
ApacheAirflow -> download_upload [label="Orchestrates" lhead=cluster_single_operator_approach];
32+
33+
34+
// Data Flow: Ingestion
35+
download_upload -> GCS [label="Uploads Data"];
36+
37+
// Data Flow: Processing (GCS to BigQuery via BigFrames operations)
38+
GCS -> bf_to_gbq [label="Data Source"]; // bf_to_gbq reads from GCS (via BigQuery engine)
39+
bf_to_gbq -> BigQuery [label="BigFrames processing\n& writes to final table"]; // BigFrames operates *in* BigQuery and writes final data
40+
41+
// Dependencies within the Airflow DAG logic
42+
download_upload -> bf_to_gbq [label="Task Dependency"];
43+
44+
// Add a general note about BigFrames execution for clarity
45+
note_bigframes_execution [label="BigFrames operations execute\ndirectly within BigQuery's engine,\nminimizing Airflow worker load.", shape=note, fontsize=10, fillcolor="#FFFACD"];
46+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
digraph DataPipeline {
2+
rankdir=TB;
3+
// ratio=0.75; // Set aspect ratio to 1 for a square shape
4+
compound=true;
5+
6+
// Global node styles for consistent appearance
7+
node [shape=box, style=filled, fontname="Helvetica", fillcolor="#D8EBF7"]; // Single global fillcolor
8+
9+
// New nodes for platforms
10+
node [shape=cylinder, fillcolor="#C1E1C1", style=filled]; // Distinct style for platform nodes
11+
ApacheAirflow [label="Apache Airflow\n(Orchestration)"];
12+
GCS [label="Google Cloud Storage"];
13+
BigQuery [label="BigQuery\n(Data Processing and Storage)"];
14+
15+
// Restore default node style for operators within the subgraph
16+
node [shape=box, fillcolor="#D8EBF7"];
17+
18+
19+
// Approach 2: Split PythonOperators with XCom and Cleanup
20+
subgraph cluster_split_operator_approach {
21+
label="Data Pipeline Operations"; // More general label for the single approach
22+
style=filled;
23+
fillcolor="#F5F5F5"; // Light grey background for this subgraph
24+
25+
// Common Data Ingestion Step (orchestrated by Airflow, moves data to GCS)
26+
download_upload [label="Download CSV from HTTPS\n& Upload to GCS\n(BashOperator)"];
27+
bf_preprocess_op [label="1. Preprocess Data (PythonOperator)\n(Reads data via BigFrames)", height=1.5];
28+
bf_validate_write_op [label="2. Validate & Write (PythonOperator)\n(Validates & Writes to BigQuery final table)", height=1.5];
29+
cleanup_preprocess_table_op [label="3. Cleanup Temporary Table\n(BigQueryDeleteTableOperator)\n(trigger_rule='all_done')", shape=box, style="filled,dashed"];
30+
}
31+
32+
// Connect Airflow to the operators it orchestrates
33+
ApacheAirflow -> download_upload [label="Orchestrates" lhead=cluster_split_operator_approach];
34+
35+
36+
// Data Flow: Ingestion
37+
download_upload -> GCS [label="Uploads Data"];
38+
39+
// Data Flow: Processing (GCS to BigQuery via BigFrames operations)
40+
GCS -> bf_preprocess_op [label="Data Source"]; // bf_preprocess_op reads from GCS (via BigQuery engine)
41+
bf_preprocess_op -> BigQuery [label="BigFrames processing"]; // BigFrames operates *in* BigQuery
42+
bf_validate_write_op -> BigQuery [label="BigFrames validation and writes to final table"]; // Writes to final table in BigQuery
43+
cleanup_preprocess_table_op -> BigQuery [style=dashed, label="Cleans Up"]; // Show BigQuery being used by cleanup
44+
45+
// Dependencies within the Airflow DAG logic
46+
download_upload -> bf_preprocess_op [label="Task Dependency"];
47+
bf_preprocess_op -> bf_validate_write_op [label="Temporary Table ID\n(XCom)"];
48+
49+
// Cleanup dependencies (trigger_rule="all_done")
50+
bf_preprocess_op -> cleanup_preprocess_table_op [label="Cleanup Trigger", style=dashed, color=gray, constraint=false];
51+
bf_validate_write_op -> cleanup_preprocess_table_op [label="Cleanup Trigger", style=dashed, color=gray, constraint=false];
52+
53+
// Add a general note about BigFrames execution for clarity
54+
note_bigframes_execution [label="BigFrames operations execute\ndirectly within BigQuery's engine,\nminimizing Airflow worker load.", shape=note, fontsize=10, fillcolor="#FFFACD"];
55+
}

2025/census-data-airflow-bigframes/census_to_bigquery_split.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
"""
2626

2727
import datetime
28-
import json
2928

3029
from airflow import models
3130
from airflow.operators import bash
@@ -43,7 +42,7 @@
4342
"start_date": datetime.datetime(2025, 6, 30),
4443
}
4544

46-
GCS_LOCATION = "gs://us-central1-bigframes-orche-5b3ec9ed-bucket/data/us-census/cc-est2024-agesex-all.csv"
45+
GCS_LOCATION = "gs://us-central1-bigframes-orche-0186cdcf-bucket/data/us-census/cc-est2024-agesex-all.csv"
4746

4847
# Define a DAG (directed acyclic graph) of tasks.
4948
# Any task you create within the context manager is automatically added to the
@@ -69,7 +68,7 @@ def preprocess(task_instance):
6968

7069
import bigframes.pandas as bpd
7170

72-
GCS_LOCATION = "gs://us-central1-bigframes-orche-5b3ec9ed-bucket/data/us-census/cc-est2024-agesex-all.csv"
71+
GCS_LOCATION = "gs://us-central1-bigframes-orche-0186cdcf-bucket/data/us-census/cc-est2024-agesex-all.csv"
7372

7473
bpd.options.bigquery.ordering_mode = "partial"
7574
bpd.options.compute.maximum_result_rows = 10_000
@@ -152,3 +151,4 @@ def validate_and_write(task_instance):
152151
)
153152

154153
download_upload >> bf_preprocess >> bf_validate_and_write >> cleanup_preprocess_table
154+
bf_preprocess >> cleanup_preprocess_table

0 commit comments

Comments
 (0)