diff --git a/chapters/chapter18/README.md b/chapters/chapter18/README.md index 0913f17b..6769de99 100644 --- a/chapters/chapter18/README.md +++ b/chapters/chapter18/README.md @@ -1,35 +1,54 @@ -# Chapter 13 - GCP +# Chapter 18 - GCP -Code accompanying the GCP section of Chapter 13 in the book 'Data pipelines with Apache Airflow'. +Code accompanying Chapter 18 (Airflow on GCP) of the book [Data Pipelines with Apache Airflow](https://www.manning.com/books/data-pipelines-with-apache-airflow). ## Contents This code example contains the following files: -├── Makefile # Makefile for helping run commands. +``` +├── Makefile # Makefile for helping run commands ├── dags │ └── gcp.py # The actual DAG. ├── docker-compose.yml # Docker-compose file for Airflow. ├── README.md # This file. └── scripts └── fetch_data.py # Helper script for fetching data. +``` ## Usage -To get started with the code example, first make sure to fetch the required dataset: +This DAG uses several GCP resources, and expects these to be available. An empty template with variables to be +filled is given in `.env.template`. Copy this file to a new file named `.env`, and fill in the details: - make data/ratings +``` +GCP_PROJECT=[Name of your GCP project] +GCP_KEY=[JSON key for a service account with permissions to use resources] +RATINGS_BUCKET=[Name of a bucket to which ratings data will be uploaded] +RESULT_BUCKET=[Name of a bucket on which results will be stored] +BIGQUERY_DATASET=[Name of the BigQuery dataset to store ratings data] +``` -Next, use the GCP console (or other tool of choice) to create the following resources for the DAG: +For this project to work, you require the following resources: -* GCS bucket +- A GCP project +- A service account + JSON key to be used by Airflow tasks +- A GCS bucket for storing ratings data +- A GCS bucket for storing result data (this can be the same bucket, different object prefix) +- And a BigQuery dataset -How to create these resources (+ what settings to used) is described in the Chapter. +How to create these resources (+ what settings to use) is described in the chapter. Once the required +resources have been created and you've entered the details in the `.env` file, you can start Airflow to run +the DAG: -Once the required resources have been created, you can start Airflow to run the DAG: +```bash +make airflow-start +``` - make airflow-start +The username/password for the Airflow UI are `airflow`/`airflow`. -You can tear down the Airflow resources using +You can tear down all resources using: - make airflow-stop +```bash +make airflow-stop +``` diff --git a/chapters/chapter18/dags/custom/__init__.py b/chapters/chapter18/dags/custom/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/chapters/chapter18/dags/custom/hooks.py b/chapters/chapter18/dags/custom/hooks.py new file mode 100644 index 00000000..cb55bde4 --- /dev/null +++ b/chapters/chapter18/dags/custom/hooks.py @@ -0,0 +1,148 @@ +import datetime as dt + +import requests + +from airflow.hooks.base_hook import BaseHook + + +class MovielensHook(BaseHook): + """ + Hook for the MovieLens API (introduced in Chapter 8). + + Abstracts details of the Movielens (REST) API and provides several convenience + methods for fetching data (e.g. ratings, users, movies) from the API. Also + provides support for automatic retries of failed requests, transparent + handling of pagination, authentication, etc. + + Parameters + ---------- + conn_id : str + ID of the connection to use to connect to the Movielens API. Connection + is expected to include authentication details (login/password) and the + host that is serving the API. + """ + + DEFAULT_SCHEMA = "http" + DEFAULT_PORT = 5000 + + def __init__(self, conn_id, retry=3): + super().__init__(source=None) + self._conn_id = conn_id + self._retry = retry + + self._session = None + self._base_url = None + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def get_conn(self): + """ + Returns the connection used by the hook for querying data. + Should in principle not be used directly. + """ + + if self._session is None: + # Fetch config for the given connection (host, login, etc). + config = self.get_connection(self._conn_id) + + if not config.host: + raise ValueError(f"No host specified in connection {self._conn_id}") + + schema = config.schema or self.DEFAULT_SCHEMA + port = config.port or self.DEFAULT_PORT + + self._base_url = f"{schema}://{config.host}:{port}" + + # Build our session instance, which we will use for any + # requests to the API. + self._session = requests.Session() + + if config.login: + self._session.auth = (config.login, config.password) + + return self._session, self._base_url + + def close(self): + """Closes any active session.""" + if self._session: + self._session.close() + self._session = None + self._base_url = None + + def get_ratings(self, start_date=None, end_date=None, batch_size=100): + """ + Fetches ratings between the given start/end date. + + Parameters + ---------- + start_date : str + Start date to start fetching ratings from (inclusive). Expected + format is YYYY-MM-DD (equal to Airflow's ds formats). + end_date : str + End date to fetching ratings up to (exclusive). Expected + format is YYYY-MM-DD (equal to Airflow's ds formats). + batch_size : int + Size of the batches (pages) to fetch from the API. Larger values + mean less requests, but more data transferred per request. + """ + + yield from self._get_with_pagination( + endpoint="/ratings", + params={"start_date": start_date, "end_date": end_date}, + batch_size=batch_size, + ) + + def get_ratings_for_month(self, year, month, batch_size=100): + """ + Fetches ratings for a given month. + + Parameters + ---------- + year : int + Year to fetch for. + month : int + Month to fetch for. + batch_size : int + Size of the batches (pages) to fetch from the API. Larger values + mean less requests, but more data transferred per request. + """ + + start_date = dt.datetime(year, month, day=1) + + if month == 12: + end_date = dt.datetime(year + 1, month, day=1) + else: + end_date = dt.datetime(year, month + 1, day=1) + + yield from self.get_ratings( + start_date=start_date.strftime("%Y-%m-%d"), + end_date=end_date.strftime("%Y-%m-%d"), + batch_size=batch_size, + ) + + def _get_with_pagination(self, endpoint, params, batch_size=100): + """ + Fetches records using a get request with given url/params, + taking pagination into account. + """ + + session, base_url = self.get_conn() + url = base_url + endpoint + + offset = 0 + total = None + while total is None or offset < total: + response = session.get( + url, params={**params, **{"offset": offset, "limit": batch_size}} + ) + response.raise_for_status() + response_json = response.json() + + yield from response_json["result"] + + offset += batch_size + total = response_json["total"] diff --git a/chapters/chapter18/dags/gcp.py b/chapters/chapter18/dags/gcp.py index 0a6b705e..965c2d8c 100644 --- a/chapters/chapter18/dags/gcp.py +++ b/chapters/chapter18/dags/gcp.py @@ -1,32 +1,77 @@ import datetime +import logging import os +import tempfile +from os import path -from airflow.contrib.operators.bigquery_operator import BigQueryOperator -from airflow.contrib.operators.bigquery_table_delete_operator import ( - BigQueryTableDeleteOperator, -) -from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator -from airflow.contrib.operators.file_to_gcs import FileToGoogleCloudStorageOperator -from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator +import pandas as pd from airflow.models import DAG +from airflow.operators.python_operator import PythonOperator +from airflow.providers.google.cloud.hooks.gcs import GCSHook +from airflow.providers.google.cloud.operators.bigquery import ( + BigQueryExecuteQueryOperator, + BigQueryDeleteTableOperator, +) +from airflow.providers.google.cloud.transfers.bigquery_to_gcs import ( + BigQueryToGCSOperator, +) +from airflow.providers.google.cloud.transfers.gcs_to_bigquery import ( + GCSToBigQueryOperator, +) +from custom.hooks import MovielensHook dag = DAG( "gcp_movie_ranking", - start_date=datetime.datetime(1995, 1, 1), + start_date=datetime.datetime(year=2019, month=1, day=1), + end_date=datetime.datetime(year=2019, month=3, day=1), schedule_interval="@monthly", + default_args={"depends_on_past": True}, ) -upload_ratings_to_gcs = FileToGoogleCloudStorageOperator( - task_id="upload_ratings_to_gcs", - src="/data/{{ execution_date.year }}/{{ execution_date.strftime('%m') }}.csv", - bucket=os.environ["RATINGS_BUCKET"], - dst="ratings/{{ execution_date.year }}/{{ execution_date.strftime('%m') }}.csv", - dag=dag, +def _fetch_ratings(api_conn_id, gcp_conn_id, gcs_bucket, **context): + year = context["execution_date"].year + month = context["execution_date"].month + + # Fetch ratings from our API. + logging.info(f"Fetching ratings for {year}/{month:02d}") + + api_hook = MovielensHook(conn_id=api_conn_id) + ratings = pd.DataFrame.from_records( + api_hook.get_ratings_for_month(year=year, month=month), + columns=["userId", "movieId", "rating", "timestamp"], + ) + + logging.info(f"Fetched {ratings.shape[0]} rows") + + # Write ratings to temp file. + with tempfile.TemporaryDirectory() as tmp_dir: + tmp_path = path.join(tmp_dir, "ratings.csv") + ratings.to_csv(tmp_path, index=False) + + # Upload file to GCS. + logging.info(f"Writing results to ratings/{year}/{month:02d}.csv") + gcs_hook = GCSHook(gcp_conn_id) + gcs_hook.upload( + bucket_name=gcs_bucket, + object_name=f"ratings/{year}/{month:02d}.csv", + filename=tmp_path, + ) + + +fetch_ratings = PythonOperator( + task_id="fetch_ratings", + python_callable=_fetch_ratings, + op_kwargs={ + "api_conn_id": "movielens", + "gcp_conn_id": "gcp", + "gcs_bucket": os.environ["RATINGS_BUCKET"], + }, + provide_context=True, ) -import_in_bigquery = GoogleCloudStorageToBigQueryOperator( +import_in_bigquery = GCSToBigQueryOperator( task_id="import_in_bigquery", bucket=os.environ["RATINGS_BUCKET"], source_objects=[ @@ -53,7 +98,7 @@ dag=dag, ) -query_top_ratings = BigQueryOperator( +query_top_ratings = BigQueryExecuteQueryOperator( task_id="query_top_ratings", destination_dataset_table=( os.environ["GCP_PROJECT"] @@ -62,19 +107,20 @@ + "." + "rating_results_{{ ds_nodash }}" ), - sql="""SELECT movieid, AVG(rating) as avg_rating, COUNT(*) as num_ratings -FROM airflow.ratings -WHERE DATE(timestamp) <= DATE({{ ds }}) -GROUP BY movieid -ORDER BY avg_rating DESC -""", + sql=( + "SELECT movieid, AVG(rating) as avg_rating, COUNT(*) as num_ratings " + "FROM " + os.environ["BIGQUERY_DATASET"] + ".ratings " + "WHERE DATE(timestamp) <= DATE({{ ds }}) " + "GROUP BY movieid " + "ORDER BY avg_rating DESC" + ), write_disposition="WRITE_TRUNCATE", create_disposition="CREATE_IF_NEEDED", bigquery_conn_id="gcp", dag=dag, ) -extract_top_ratings = BigQueryToCloudStorageOperator( +extract_top_ratings = BigQueryToGCSOperator( task_id="extract_top_ratings", source_project_dataset_table=( os.environ["GCP_PROJECT"] @@ -83,15 +129,15 @@ + "." + "rating_results_{{ ds_nodash }}" ), - destination_cloud_storage_uris=( + destination_cloud_storage_uris=[ "gs://" + os.environ["RESULT_BUCKET"] + "/{{ ds_nodash }}.csv" - ), + ], export_format="CSV", bigquery_conn_id="gcp", dag=dag, ) -delete_result_table = BigQueryTableDeleteOperator( +delete_result_table = BigQueryDeleteTableOperator( task_id="delete_result_table", deletion_dataset_table=( os.environ["GCP_PROJECT"] @@ -104,4 +150,4 @@ dag=dag, ) -upload_ratings_to_gcs >> import_in_bigquery >> query_top_ratings >> extract_top_ratings >> delete_result_table +fetch_ratings >> import_in_bigquery >> query_top_ratings >> extract_top_ratings >> delete_result_table diff --git a/chapters/chapter18/docker-compose.yml b/chapters/chapter18/docker-compose.yml index 056bd5e7..395c2d10 100644 --- a/chapters/chapter18/docker-compose.yml +++ b/chapters/chapter18/docker-compose.yml @@ -3,6 +3,7 @@ version: '3.7' # ====================================== AIRFLOW ENVIRONMENT VARIABLES ======================================= x-environment: &airflow_environment - AIRFLOW__CORE__EXECUTOR=LocalExecutor + - AIRFLOW__CORE__FERNET_KEY=y92Y_MlwZQVzK1ip4YantMUXaLBlE9VMURXVLmjOrR4= - AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False - AIRFLOW__CORE__LOAD_EXAMPLES=False - AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql://airflow:airflow@postgres:5432/airflow @@ -10,11 +11,14 @@ x-environment: &airflow_environment - AIRFLOW__CORE__STORE_SERIALIZED_DAGS=True - AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True - AIRFLOW__WEBSERVER__RBAC=True - - AIRFLOW_CONN_GCP='google-cloud-platform://?extra__google_cloud_platform__keyfile_dict=${GCP_KEY}' +# - AIRFLOW_CONN_GCP=google-cloud-platform://?extra__google_cloud_platform__keyfile_dict=${GCP_KEY} + - AIRFLOW_CONN_MOVIELENS=http://airflow:airflow@movielens - BIGQUERY_DATASET=${BIGQUERY_DATASET} - GCP_PROJECT=${GCP_PROJECT} - RATINGS_BUCKET=${RATINGS_BUCKET} - RESULT_BUCKET=${RESULT_BUCKET} + +x-airflow-image: &airflow_image apache/airflow:1.10.12-python3.8 # ====================================== /AIRFLOW ENVIRONMENT VARIABLES ====================================== services: @@ -28,7 +32,10 @@ services: - "5432:5432" initdb_adduser: - image: apache/airflow:1.10.11-python3.7 + build: + context: docker/airflow-gcp + args: + AIRFLOW_BASE_IMAGE: *airflow_image depends_on: - postgres environment: *airflow_environment @@ -38,7 +45,10 @@ services: command: -c 'airflow initdb && sleep 5 && airflow create_user --role Admin --username airflow --password airflow -e airflow@airflow.com -f airflow -l airflow' webserver: - image: apache/airflow:1.10.11-python3.7 + build: + context: docker/airflow-gcp + args: + AIRFLOW_BASE_IMAGE: *airflow_image restart: always depends_on: - postgres @@ -50,7 +60,10 @@ services: command: webserver scheduler: - image: apache/airflow:1.10.11-python3.7 + build: + context: docker/airflow-gcp + args: + AIRFLOW_BASE_IMAGE: *airflow_image restart: always depends_on: - postgres @@ -61,5 +74,13 @@ services: environment: *airflow_environment command: scheduler + movielens: + build: ../chapter08/docker/movielens-api + ports: + - "5000:5000" + environment: + API_USER: airflow + API_PASSWORD: airflow + volumes: logs: diff --git a/chapters/chapter18/docker/airflow-gcp/Dockerfile b/chapters/chapter18/docker/airflow-gcp/Dockerfile new file mode 100644 index 00000000..3110f25b --- /dev/null +++ b/chapters/chapter18/docker/airflow-gcp/Dockerfile @@ -0,0 +1,5 @@ +ARG AIRFLOW_BASE_IMAGE="apache/airflow:1.10.12-python3.8" +FROM ${AIRFLOW_BASE_IMAGE} + +USER airflow +RUN pip install --user --no-cache-dir apache-airflow-backport-providers-google==2020.10.5