This section explains the following:
- How to create a Kedro node from a Python function
- How to construct a Kedro pipeline from a set of nodes
- How to persist, or save, datasets output from the pipeline by registering them in the data catalog
- How to run the pipeline
The data processing pipeline prepares the data for model building by combining the datasets to create a model input table. The data processing pipeline is made up of the following:
- Two python files within
src/spaceflights/pipelines/data_processing
nodes.py
(for the node functions that form the data processing)pipeline.py
(to build the pipeline)
- A yaml file:
conf/base/parameters_data_processing.yml
to define the parameters used when running the pipeline __init__.py
files in the required folders to ensure that Python can import the pipeline
Kedro provides the `kedro pipeline create` command to add the skeleton code for a new pipeline. If you are writing a project from scratch and want to add a new pipeline, run the following from the terminal: `kedro pipeline create <pipeline_name>`. You do **not** need to do this in the spaceflights example as it is already supplied by the starter project.
The hands-on video course walks through data exploration and data processing for the spaceflights data. There are several videos in the playlist that cover the topic starting with the following:
.. youtube:: bZD8N0yv3Fs
:width: 100%
The first step is to preprocess two of the datasets, companies.csv
, and shuttles.xlsx
. The preprocessing code for the nodes is in src/spaceflights/pipelines/data_processing/nodes.py
as a pair of functions (preprocess_companies
and preprocess_shuttles
). Each takes a raw DataFrame as input, converts the data in several columns to different types, and outputs a DataFrame containing the preprocessed data:
Click to expand
import pandas as pd
def _is_true(x: pd.Series) -> pd.Series:
return x == "t"
def _parse_percentage(x: pd.Series) -> pd.Series:
x = x.str.replace("%", "")
x = x.astype(float) / 100
return x
def _parse_money(x: pd.Series) -> pd.Series:
x = x.str.replace("$", "").str.replace(",", "")
x = x.astype(float)
return x
def preprocess_companies(companies: pd.DataFrame) -> pd.DataFrame:
"""Preprocesses the data for companies.
Args:
companies: Raw data.
Returns:
Preprocessed data, with `company_rating` converted to a float and
`iata_approved` converted to boolean.
"""
companies["iata_approved"] = _is_true(companies["iata_approved"])
companies["company_rating"] = _parse_percentage(companies["company_rating"])
return companies
def preprocess_shuttles(shuttles: pd.DataFrame) -> pd.DataFrame:
"""Preprocesses the data for shuttles.
Args:
shuttles: Raw data.
Returns:
Preprocessed data, with `price` converted to a float and `d_check_complete`,
`moon_clearance_complete` converted to boolean.
"""
shuttles["d_check_complete"] = _is_true(shuttles["d_check_complete"])
shuttles["moon_clearance_complete"] = _is_true(shuttles["moon_clearance_complete"])
shuttles["price"] = _parse_money(shuttles["price"])
return shuttles
Next, take a look at src/spaceflights/pipelines/data_processing/pipeline.py
which constructs a node for each function defined above and creates a modular pipeline for data processing:
Click to expand
from kedro.pipeline import Pipeline, node, pipeline
from .nodes import preprocess_companies, preprocess_shuttles
...
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=preprocess_companies,
inputs="companies",
outputs="preprocessed_companies",
name="preprocess_companies_node",
),
node(
func=preprocess_shuttles,
inputs="shuttles",
outputs="preprocessed_shuttles",
name="preprocess_shuttles_node",
),
...,
]
)
Note that the inputs
statements for companies
and shuttles
refer to the datasets defined in conf/base/catalog.yml
. They are inputs to the preprocess_companies
and preprocess_shuttles
functions. Kedro uses the named node inputs (and outputs) to determine interdependencies between the nodes, and their execution order.
Run the following command in your terminal window to test the node named preprocess_companies_node
:
kedro run --nodes=preprocess_companies_node
You should see output similar to the below:
Click to expand
[08/09/22 16:43:11] INFO Loading data from 'companies' (CSVDataset)... data_catalog.py:343
INFO Running node: preprocess_companies_node: node.py:327
preprocess_companies([companies]) -> [preprocessed_companies]
INFO Saving data to 'preprocessed_companies' (MemoryDataset)... data_catalog.py:382
INFO Completed 1 out of 1 tasks sequential_runner.py:85
INFO Pipeline execution completed successfully. runner.py:89
INFO Loading data from 'preprocessed_companies' (MemoryDataset)... data_catalog.py:343
You can run the preprocess_shuttles
node similarly. To test both nodes together as the complete data processing pipeline:
kedro run
You can also run both nodes by naming each in turn, as follows:
kedro run --nodes=preprocess_companies_node,preprocess_shuttles_node
You should see output similar to the following:
Click to expand
INFO Loading data from 'companies' (CSVDataset)... data_catalog.py:343
INFO Running node: preprocess_companies_node: node.py:327
preprocess_companies([companies]) -> [preprocessed_companies]
INFO Saving data to 'preprocessed_companies' (MemoryDataset)... data_catalog.py:382
INFO Completed 1 out of 2 tasks sequential_runner.py:85
INFO Loading data from 'shuttles' (ExcelDataset)... data_catalog.py:343
[08/09/22 16:46:08] INFO Running node: preprocess_shuttles_node: preprocess_shuttles([shuttles]) node.py:327
-> [preprocessed_shuttles]
INFO Saving data to 'preprocessed_shuttles' (MemoryDataset)... data_catalog.py:382
INFO Completed 2 out of 2 tasks sequential_runner.py:85
INFO Pipeline execution completed successfully. runner.py:89
INFO Loading data from 'preprocessed_companies' (MemoryDataset)... data_catalog.py:343
INFO Loading data from 'preprocessed_shuttles' (MemoryDataset)... data_catalog.py:343
Each of the nodes outputs a new dataset (preprocessed_companies
and preprocessed_shuttles
). Kedro saves these outputs in Parquet format {class}pandas.ParquetDataset<kedro-datasets:kedro_datasets.pandas.ParquetDataset>
because they are registered within the Data Catalog as you can see in conf/base/catalog.yml
:
Click to expand
preprocessed_companies:
type: pandas.ParquetDataset
filepath: data/02_intermediate/preprocessed_companies.pq
preprocessed_shuttles:
type: pandas.ParquetDataset
filepath: data/02_intermediate/preprocessed_shuttles.pq
If you remove these lines from catalog.yml
, Kedro still runs the pipeline successfully and automatically stores the preprocessed data, in memory, as temporary Python objects of the {py:class}~kedro.io.MemoryDataset
class. Once all nodes that depend on a temporary dataset have executed, Kedro clears the dataset and the Python garbage collector releases the memory.
The next step adds another node that joins together three datasets (preprocessed_shuttles
, preprocessed_companies
, and reviews
) into a single model input table which is saved as model_input_table
.
The code for the create_model_input_table()
function is in src/spaceflights/pipelines/data_processing/nodes.py
:
Click to expand
def create_model_input_table(
shuttles: pd.DataFrame, companies: pd.DataFrame, reviews: pd.DataFrame
) -> pd.DataFrame:
"""Combines all data to create a model input table.
Args:
shuttles: Preprocessed data for shuttles.
companies: Preprocessed data for companies.
reviews: Raw data for reviews.
Returns:
model input table.
"""
rated_shuttles = shuttles.merge(reviews, left_on="id", right_on="shuttle_id")
rated_shuttles = rated_shuttles.drop("id", axis=1)
model_input_table = rated_shuttles.merge(
companies, left_on="company_id", right_on="id"
)
model_input_table = model_input_table.dropna()
return model_input_table
The node is created in src/kedro_tutorial/pipelines/data_processing/pipeline.py
:
Click to expand
from kedro.pipeline import Pipeline, node, pipeline
from .nodes import create_model_input_table, preprocess_companies, preprocess_shuttles
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=preprocess_companies,
inputs="companies",
outputs="preprocessed_companies",
name="preprocess_companies_node",
),
node(
func=preprocess_shuttles,
inputs="shuttles",
outputs="preprocessed_shuttles",
name="preprocess_shuttles_node",
),
node(
func=create_model_input_table,
inputs=["preprocessed_shuttles", "preprocessed_companies", "reviews"],
outputs="model_input_table",
name="create_model_input_table_node",
),
]
)
The following entry in conf/base/catalog.yml
saves the model input table dataset to file (in data/03_primary
):
model_input_table:
type: pandas.ParquetDataset
filepath: data/03_primary/model_input_table.pq
To test the progress of the example:
kedro run
You should see output similar to the following:
Click to expand
[08/09/22 17:01:10] INFO Reached after_catalog_created hook plugin.py:17
INFO Loading data from 'companies' (CSVDataset)... data_catalog.py:343
INFO Running node: preprocess_companies_node: node.py:327
preprocess_companies([companies]) -> [preprocessed_companies]
INFO Saving data to 'preprocessed_companies' (MemoryDataset)... data_catalog.py:382
INFO Completed 1 out of 3 tasks sequential_runner.py:85
INFO Loading data from 'shuttles' (ExcelDataset)... data_catalog.py:343
[08/09/22 17:01:25] INFO Running node: preprocess_shuttles_node: preprocess_shuttles([shuttles]) node.py:327
-> [preprocessed_shuttles]
INFO Saving data to 'preprocessed_shuttles' (MemoryDataset)... data_catalog.py:382
INFO Completed 2 out of 3 tasks sequential_runner.py:85
INFO Loading data from 'preprocessed_shuttles' (MemoryDataset)... data_catalog.py:343
INFO Loading data from 'preprocessed_companies' (MemoryDataset)... data_catalog.py:343
INFO Loading data from 'reviews' (CSVDataset)... data_catalog.py:343
INFO Running node: create_model_input_table_node: node.py:327
create_model_input_table([preprocessed_shuttles,preprocessed_companies,
reviews]) -> [model_input_table]
[08/09/22 17:01:28] INFO Saving data to 'model_input_table' (MemoryDataset)... data_catalog.py:382
[08/09/22 17:01:29] INFO Completed 3 out of 3 tasks sequential_runner.py:85
INFO Pipeline execution completed successfully. runner.py:89
INFO Loading data from 'model_input_table' (MemoryDataset)... data_catalog.py:343
This section introduces project visualisation using Kedro-Viz, which is a separate package from the standard Kedro installation. To install it your virtual environment:
pip install kedro-viz
To start Kedro-Viz, enter the following in your terminal:
kedro viz run
This command automatically opens a browser tab to serve the visualisation at http://127.0.0.1:4141/
. Explore the visualisation at leisure, and consult the {doc}Kedro-Viz documentation<kedro-viz:kedro-viz_visualisation>
for more detail.
To exit, close the browser tab. To regain control of the terminal, enter ^+c
on Mac or Ctrl+c
on Windows or Linux machines.
.. youtube:: KWqSzbHgNW4
:width: 100%
This is an excellent place to take a breath and summarise what you have seen in the example so far.
Photo by Malte Helmhold on Unsplash
- How to create a new Kedro project from a starter and install its dependencies
- How to add three datasets to the project and set up the Kedro Data Catalog
- How to create a data processing pipeline with three nodes to transform and merge the input datasets and create a model input table
- How to persist the output from a pipeline by registering those datasets to the Data Catalog
- How to visualise the project
The next step is to create the data science pipeline for spaceflight price prediction.