Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 31 additions & 12 deletions chapters/chapter18/README.md
Original file line number Diff line number Diff line change
@@ -1,35 +1,54 @@
# Chapter 13 - GCP
# Chapter 18 - GCP

Code accompanying the GCP section of Chapter 13 in the book 'Data pipelines with Apache Airflow'.
Code accompanying Chapter 18 (Airflow on GCP) of the book [Data Pipelines with Apache Airflow](https://www.manning.com/books/data-pipelines-with-apache-airflow).

## Contents

This code example contains the following files:

├── Makefile # Makefile for helping run commands.
```
├── Makefile # Makefile for helping run commands
├── dags
│ └── gcp.py # The actual DAG.
├── docker-compose.yml # Docker-compose file for Airflow.
├── README.md # This file.
└── scripts
└── fetch_data.py # Helper script for fetching data.
```

## Usage

To get started with the code example, first make sure to fetch the required dataset:
This DAG uses several GCP resources, and expects these to be available. An empty template with variables to be
filled is given in `.env.template`. Copy this file to a new file named `.env`, and fill in the details:

make data/ratings
```
GCP_PROJECT=[Name of your GCP project]
GCP_KEY=[JSON key for a service account with permissions to use resources]
RATINGS_BUCKET=[Name of a bucket to which ratings data will be uploaded]
RESULT_BUCKET=[Name of a bucket on which results will be stored]
BIGQUERY_DATASET=[Name of the BigQuery dataset to store ratings data]
```

Next, use the GCP console (or other tool of choice) to create the following resources for the DAG:
For this project to work, you require the following resources:

* GCS bucket
- A GCP project
- A service account + JSON key to be used by Airflow tasks
- A GCS bucket for storing ratings data
- A GCS bucket for storing result data (this can be the same bucket, different object prefix)
- And a BigQuery dataset

How to create these resources (+ what settings to used) is described in the Chapter.
How to create these resources (+ what settings to use) is described in the chapter. Once the required
resources have been created and you've entered the details in the `.env` file, you can start Airflow to run
the DAG:

Once the required resources have been created, you can start Airflow to run the DAG:
```bash
make airflow-start
```

make airflow-start
The username/password for the Airflow UI are `airflow`/`airflow`.

You can tear down the Airflow resources using
You can tear down all resources using:

make airflow-stop
```bash
make airflow-stop
```
Empty file.
148 changes: 148 additions & 0 deletions chapters/chapter18/dags/custom/hooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import datetime as dt

import requests

from airflow.hooks.base_hook import BaseHook


class MovielensHook(BaseHook):
"""
Hook for the MovieLens API (introduced in Chapter 8).

Abstracts details of the Movielens (REST) API and provides several convenience
methods for fetching data (e.g. ratings, users, movies) from the API. Also
provides support for automatic retries of failed requests, transparent
handling of pagination, authentication, etc.

Parameters
----------
conn_id : str
ID of the connection to use to connect to the Movielens API. Connection
is expected to include authentication details (login/password) and the
host that is serving the API.
"""

DEFAULT_SCHEMA = "http"
DEFAULT_PORT = 5000

def __init__(self, conn_id, retry=3):
super().__init__(source=None)
self._conn_id = conn_id
self._retry = retry

self._session = None
self._base_url = None

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

def get_conn(self):
"""
Returns the connection used by the hook for querying data.
Should in principle not be used directly.
"""

if self._session is None:
# Fetch config for the given connection (host, login, etc).
config = self.get_connection(self._conn_id)

if not config.host:
raise ValueError(f"No host specified in connection {self._conn_id}")

schema = config.schema or self.DEFAULT_SCHEMA
port = config.port or self.DEFAULT_PORT

self._base_url = f"{schema}://{config.host}:{port}"

# Build our session instance, which we will use for any
# requests to the API.
self._session = requests.Session()

if config.login:
self._session.auth = (config.login, config.password)

return self._session, self._base_url

def close(self):
"""Closes any active session."""
if self._session:
self._session.close()
self._session = None
self._base_url = None

def get_ratings(self, start_date=None, end_date=None, batch_size=100):
"""
Fetches ratings between the given start/end date.

Parameters
----------
start_date : str
Start date to start fetching ratings from (inclusive). Expected
format is YYYY-MM-DD (equal to Airflow's ds formats).
end_date : str
End date to fetching ratings up to (exclusive). Expected
format is YYYY-MM-DD (equal to Airflow's ds formats).
batch_size : int
Size of the batches (pages) to fetch from the API. Larger values
mean less requests, but more data transferred per request.
"""

yield from self._get_with_pagination(
endpoint="/ratings",
params={"start_date": start_date, "end_date": end_date},
batch_size=batch_size,
)

def get_ratings_for_month(self, year, month, batch_size=100):
"""
Fetches ratings for a given month.

Parameters
----------
year : int
Year to fetch for.
month : int
Month to fetch for.
batch_size : int
Size of the batches (pages) to fetch from the API. Larger values
mean less requests, but more data transferred per request.
"""

start_date = dt.datetime(year, month, day=1)

if month == 12:
end_date = dt.datetime(year + 1, month, day=1)
else:
end_date = dt.datetime(year, month + 1, day=1)

yield from self.get_ratings(
start_date=start_date.strftime("%Y-%m-%d"),
end_date=end_date.strftime("%Y-%m-%d"),
batch_size=batch_size,
)

def _get_with_pagination(self, endpoint, params, batch_size=100):
"""
Fetches records using a get request with given url/params,
taking pagination into account.
"""

session, base_url = self.get_conn()
url = base_url + endpoint

offset = 0
total = None
while total is None or offset < total:
response = session.get(
url, params={**params, **{"offset": offset, "limit": batch_size}}
)
response.raise_for_status()
response_json = response.json()

yield from response_json["result"]

offset += batch_size
total = response_json["total"]
100 changes: 73 additions & 27 deletions chapters/chapter18/dags/gcp.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,77 @@
import datetime
import logging
import os
import tempfile
from os import path

from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_table_delete_operator import (
BigQueryTableDeleteOperator,
)
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.contrib.operators.file_to_gcs import FileToGoogleCloudStorageOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
import pandas as pd
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryExecuteQueryOperator,
BigQueryDeleteTableOperator,
)
from airflow.providers.google.cloud.transfers.bigquery_to_gcs import (
BigQueryToGCSOperator,
)
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
GCSToBigQueryOperator,
)
from custom.hooks import MovielensHook

dag = DAG(
"gcp_movie_ranking",
start_date=datetime.datetime(1995, 1, 1),
start_date=datetime.datetime(year=2019, month=1, day=1),
end_date=datetime.datetime(year=2019, month=3, day=1),
schedule_interval="@monthly",
default_args={"depends_on_past": True},
)


upload_ratings_to_gcs = FileToGoogleCloudStorageOperator(
task_id="upload_ratings_to_gcs",
src="/data/{{ execution_date.year }}/{{ execution_date.strftime('%m') }}.csv",
bucket=os.environ["RATINGS_BUCKET"],
dst="ratings/{{ execution_date.year }}/{{ execution_date.strftime('%m') }}.csv",
dag=dag,
def _fetch_ratings(api_conn_id, gcp_conn_id, gcs_bucket, **context):
year = context["execution_date"].year
month = context["execution_date"].month

# Fetch ratings from our API.
logging.info(f"Fetching ratings for {year}/{month:02d}")

api_hook = MovielensHook(conn_id=api_conn_id)
ratings = pd.DataFrame.from_records(
api_hook.get_ratings_for_month(year=year, month=month),
columns=["userId", "movieId", "rating", "timestamp"],
)

logging.info(f"Fetched {ratings.shape[0]} rows")

# Write ratings to temp file.
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_path = path.join(tmp_dir, "ratings.csv")
ratings.to_csv(tmp_path, index=False)

# Upload file to GCS.
logging.info(f"Writing results to ratings/{year}/{month:02d}.csv")
gcs_hook = GCSHook(gcp_conn_id)
gcs_hook.upload(
bucket_name=gcs_bucket,
object_name=f"ratings/{year}/{month:02d}.csv",
filename=tmp_path,
)


fetch_ratings = PythonOperator(
task_id="fetch_ratings",
python_callable=_fetch_ratings,
op_kwargs={
"api_conn_id": "movielens",
"gcp_conn_id": "gcp",
"gcs_bucket": os.environ["RATINGS_BUCKET"],
},
provide_context=True,
)


import_in_bigquery = GoogleCloudStorageToBigQueryOperator(
import_in_bigquery = GCSToBigQueryOperator(
task_id="import_in_bigquery",
bucket=os.environ["RATINGS_BUCKET"],
source_objects=[
Expand All @@ -53,7 +98,7 @@
dag=dag,
)

query_top_ratings = BigQueryOperator(
query_top_ratings = BigQueryExecuteQueryOperator(
task_id="query_top_ratings",
destination_dataset_table=(
os.environ["GCP_PROJECT"]
Expand All @@ -62,19 +107,20 @@
+ "."
+ "rating_results_{{ ds_nodash }}"
),
sql="""SELECT movieid, AVG(rating) as avg_rating, COUNT(*) as num_ratings
FROM airflow.ratings
WHERE DATE(timestamp) <= DATE({{ ds }})
GROUP BY movieid
ORDER BY avg_rating DESC
""",
sql=(
"SELECT movieid, AVG(rating) as avg_rating, COUNT(*) as num_ratings "
"FROM " + os.environ["BIGQUERY_DATASET"] + ".ratings "
"WHERE DATE(timestamp) <= DATE({{ ds }}) "
"GROUP BY movieid "
"ORDER BY avg_rating DESC"
),
write_disposition="WRITE_TRUNCATE",
create_disposition="CREATE_IF_NEEDED",
bigquery_conn_id="gcp",
dag=dag,
)

extract_top_ratings = BigQueryToCloudStorageOperator(
extract_top_ratings = BigQueryToGCSOperator(
task_id="extract_top_ratings",
source_project_dataset_table=(
os.environ["GCP_PROJECT"]
Expand All @@ -83,15 +129,15 @@
+ "."
+ "rating_results_{{ ds_nodash }}"
),
destination_cloud_storage_uris=(
destination_cloud_storage_uris=[
"gs://" + os.environ["RESULT_BUCKET"] + "/{{ ds_nodash }}.csv"
),
],
export_format="CSV",
bigquery_conn_id="gcp",
dag=dag,
)

delete_result_table = BigQueryTableDeleteOperator(
delete_result_table = BigQueryDeleteTableOperator(
task_id="delete_result_table",
deletion_dataset_table=(
os.environ["GCP_PROJECT"]
Expand All @@ -104,4 +150,4 @@
dag=dag,
)

upload_ratings_to_gcs >> import_in_bigquery >> query_top_ratings >> extract_top_ratings >> delete_result_table
fetch_ratings >> import_in_bigquery >> query_top_ratings >> extract_top_ratings >> delete_result_table
Loading