ETL pipelines and data warehouse construction and orchestration for the DS4A Data Engineering capstone project.
For this section of the capstone project, I have used the following resources / tools to build our data pipelines and set up our data warehouse:
- Python 3
- Prefect
- AWS S3
- AWS CloudFormation Stacks
- Databricks Workspaces + Datawarehouse
- Dask (on the Saturn Cloud Data Science platform)
- Prefect account
- Saturn Cloud Account
- AWS Account
- Databricks Account
The code in this example uses prefect for orchestration (figuring out what to do, and in what order) and Dask Cluster for execution (doing the things).
While Prefect is our orchestration tool of choice and our Dask cluster has been configured to execute those orchestration tasks, we still have to set up our system environment to be able to run the scripts that define those tasks, as shown below:
Create a new conda virtual environment and activate it:
$ conda create —n data_pipeline_env python=3.9
And activate it:
$ conda activate data_pipeline_env
Then, install the project dependencies:
$ pip install -r requirements.txt
And be sure to set environment variables to instantiate our "read-safe" credentials for our Saturn Cloud account:
$ export SATURN_USERNAME='SATURN_USERNAME'
Finally, we authenticate our Prefect account with our API key secret:
$ prefect auth login --key PREFECT_API_KEY
Since Prefect organizes flows in association with projects, I created a project called "data-pipeline-warehouse-t23"
within my account, and initialized the prefect client within the Saturn setup script in preparation for the upcoming tasks.
client = prefect.Client()
client.create_project(project_name=PREFECT_CLOUD_PROJECT_NAME)
integration = PrefectCloudIntegration(prefect_cloud_project_name=PREFECT_CLOUD_PROJECT_NAME)
Prefect organizes groups of tasks into "flows" (workflows), which must be defined programmatically by defining functions -- the following is a breakdown of each task that we have defined for the scripts below (specifying the extraction, transformation, and loading of the data running from APIs through our pipeline):
extract
: Retrieving the data needed from the respective APItransform
: Transforming the returned response from the API into a dataframe that matches our warehouse table schemasload
: Loading the data into CSVs locally, and into S3 buckets that as synchronized to our Databricks Workspace + Warehouse Cluster
The integration sections that are commented out instantiate the flows in in the Saturn <> Prefect integration, and then pass the scheduling task on to the Cloud for execution by our Prefect project.
For the sake of demonstration, you can also create some local directories to house the resultant CSV files in:
$ mkdir -p data/treasury-data
$ mkdir -p data/natl-poverty-data
$ mkdir -p data/small-area-poverty-data
When the Prefect flows are successfully created and scheduled in the cloud (via the commented code for each script above), the output will look like this:
In the Prefect cloud dashboard, the flows will appear like so:
Our S3 bucket is linked to our CloudFormation stack, which is housing our Databricks Workspace and Warehouse:
From the Databricks Workspace, it is easy to access tables via SQL for the entire warehouse via a single unified dashboard: