Skip to content

Commit 665bc8b

Browse files
committed
adjust DAG to work in composer
1 parent 116c110 commit 665bc8b

File tree

1 file changed

+26
-15
lines changed

1 file changed

+26
-15
lines changed

2025/census-data-airflow-bigframes/census_to_bigquery_venv.py

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,17 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
# Tested on Cloud Composer 3
16-
#
17-
# For local development:
18-
# pip install 'apache-airflow[google]==2.10.5'
15+
"""
16+
An example DAG for loading data from the US Census using BigQuery DataFrames
17+
(aka bigframes). This DAG uses PythonVirtualenvOperator for environments where
18+
bigframes can't be installed for use from PythonOperator.
19+
20+
I have tested this DAG on Cloud Composer 3 with Apache Airflow 2.10.5.
21+
22+
For local development:
23+
24+
pip install 'apache-airflow[google]==2.10.5' bigframes
25+
"""
1926

2027

2128
import datetime
@@ -36,26 +43,24 @@
3643
}
3744

3845
GCS_LOCATION = "gs://us-central1-bigframes-orche-b70f2a52-bucket/data/us-census/cc-est2024-agesex-all.csv"
39-
BIGQUERY_DESTINATION = "swast-scratch.airflow_demo.us_census_by_county2020_to_present"
4046

4147
# Define a DAG (directed acyclic graph) of tasks.
4248
# Any task you create within the context manager is automatically added to the
4349
# DAG object.
4450
with models.DAG(
45-
"census_from_http_to_gcs_once",
51+
"census_from_http_to_bigquery_once",
4652
schedule_interval="@once",
4753
default_args=default_dag_args,
4854
) as dag:
49-
download = bash.BashOperator(
50-
task_id="download",
55+
download_upload = bash.BashOperator(
56+
task_id="download_upload",
5157
# See
5258
# https://www.census.gov/data/tables/time-series/demo/popest/2020s-counties-detail.html
5359
# for file paths and methodologies.
54-
bash_command="wget https://www2.census.gov/programs-surveys/popest/datasets/2020-2024/counties/asrh/cc-est2024-agesex-all.csv",
55-
)
56-
upload = bash.BashOperator(
57-
task_id="upload",
58-
bash_command=f"gcloud storage cp cc-est2024-agesex-all.csv {GCS_LOCATION}",
60+
bash_command=f"""
61+
wget https://www2.census.gov/programs-surveys/popest/datasets/2020-2024/counties/asrh/cc-est2024-agesex-all.csv -P ~;
62+
gcloud storage cp ~/cc-est2024-agesex-all.csv {GCS_LOCATION}
63+
""",
5964
)
6065

6166
def callable_virtualenv():
@@ -65,10 +70,16 @@ def callable_virtualenv():
6570
Importing at the module level ensures that it will not attempt to import the
6671
library before it is installed.
6772
"""
73+
import datetime
74+
75+
import bigframes.pandas as bpd
76+
77+
BIGQUERY_DESTINATION = "swast-scratch.airflow_demo.us_census_by_county2020_to_present"
78+
GCS_LOCATION = "gs://us-central1-bigframes-orche-b70f2a52-bucket/data/us-census/cc-est2024-agesex-all.csv"
79+
6880
#=============================
6981
# Setup bigframes
7082
#=============================
71-
import bigframes.pandas as bpd
7283

7384
# Recommended: Partial ordering mode enables the best performance.
7485
bpd.options.bigquery.ordering_mode = "partial"
@@ -144,4 +155,4 @@ def callable_virtualenv():
144155
)
145156

146157

147-
download >> upload >> bf_to_gbq
158+
download_upload >> bf_to_gbq

0 commit comments

Comments
 (0)