Runtime Variables are a set of global variables that can be used by every block. These are useful for storing constants shared by multiple blocks or constants whose value is determined at pipeline runtime (hence runtime variables).
Runtime Variables can be accessed via the **kwargs
parameter in your block function.
from pandas import DataFrame
from os import path
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
@data_loader
def loader_data(**kwargs) -> DataFrame:
filepath = kwargs.get('filepath')
row_limit = kwargs.get('row_limit')
return pd.read_csv(filepath, nrows=row_limit)
Currently, runtime variables must be of primitive Python types or some basic containers:
- integer
- string
- float
- boolean
- list
- dictionary
- set
Runtime variable names must be a valid Python identifier (i.e., no spaces in name, can't start with a number).
You can create new global variables from the Mage UI through the "Variables" tab in the sidekick. Click the "New" button, and configure your variable's name and value, and press Enter to save.
Mage provides some default variables to give context on the pipeline execution.
execution_date
: A datetime object that the pipeline is executed at.event
: If the pipeline is triggered by event, theevent
variable contains the event payload.
You can execute your pipeline with runtime variables from the command line. First, make sure you installed the package.
pip install mage-ai
Once the package is installed, you can run your pipeline through the command line.
mage run <project_path> <pipeline_uuid> --runtime-vars [name value]...
# example with 2 runtime variables "name" and "ds":
mage run default_repo default_pipeline --runtime-vars name default ds 2022-08-18
If your pipeline is configured to use runtime variables, you can still execute your pipeline outside the code editor. Provide the runtime variables as keyword arguments to mage_ai.run()
:
mage_ai.run('sample_pipeline', 'repos/default_repo', filepath = 'path/to/my/file.csv', row_limit=1000)
A common use of ETL pipelines is to process and analyze daily events. In the case of this example, we will create an ETL pipeline to analyze log messages from a web application. Suppose the following is an example of a log file that our web application produces.
log_date | type | source |
---|---|---|
2022-07-27T20:19:20 | INFO | react-1 |
2022-07-27T19:18:45 | WARNING | express-2 |
2022-07-27T16:35:28 | DEBUG | react-1 |
2022-07-27T10:19:32 | INFO | express-1 |
2022-07-27T07:20:26 | ERROR | express-1 |
2022-07-27T00:42:37 | ERROR | react-1 |
Suppose we want to know the distribution of log types at the end of every day. Using Mage's runtime variables this is made a very simple task:
-
Create a data loader to load all log files modified on a specific date. We will specify the log folder and the date to load logs from using runtime variables which are passed to this block via the
**kwargs
parameter.from datetime import datetime, timedelta, date from pandas import DataFrame, concat, read_csv from pathlib import Path import os if 'data_loader' not in globals(): from mage_ai.data_preparation.decorators import data_loader @data_loader def load_log_data(**kwargs) -> DataFrame: start = datetime.fromisoformat(kwargs.get('current_date')) end = start + timedelta(days=1) logpath = Path(kwargs.get('log_folder')) logs = [] for file in logpath.iterdir(): print(file) modification_time = datetime.fromtimestamp(os.path.getmtime(file)) if modification_time >= start and modification_time < end: df = read_csv(file) logs.append(df) return concat(logs, axis=0)
Note: This code ignores the edge case of a log file that spills over between days. Since the modification date is used instead of the creation date, a log file that is modified between days will only be considered in the latter day.
-
Calculate the distribution of log types over this date
from pandas import DataFrame from os import path import pandas as pd if 'transformer' not in globals(): from mage_ai.data_preparation.decorators import transformer @transformer def extract_statistics(df: DataFrame, **kwargs) -> DataFrame: now = kwargs.get('current_date') count = df['type'].value_counts() count = DataFrame({now: count}).T return count
The result of this transformer is a new data frame that looks like below:
DEBUG ERROR INFO WARNING 2022-07-28 816 765 828 871
This data can then be ingested into a log statistics database.
Key: As the current date and log folder are not hardcoded in the pipeline but instead provided as a runtime variable, your pipeline code remains reusable without having to change any code. Every day, the pipeline can be ran using mage_ai.run()
, providing the current date and log folder as keyword arguments:
mage_ai.run('log_stats_ingestion', 'repos/default_repo', current_date='2022-07-29', log_folder='logs/webapp')
Consider the following sample data tracking the launch angle (in degrees) and vertical velocity (in meters per second) of model rocket tests:
launch_id | angle | vertical_velocity |
---|---|---|
1 | 28.4 | 61.34 |
2 | 14.4 | 32.03 |
3 | 61.4 | 113.19 |
4 | 44.2 | 89.96 |
5 | 39.5 | 82.00 |
6 | 79.3 | 126.73 |
7 | 74.9 | 124.51 |
8 | 38.1 | 79.6 |
Suppose we want to convert the launch angle from degrees to radians in our pipeline. The example below uses the pi
runtime variable (passed in through **kwargs
) to convert the degrees column of some input data frame to a radians. This allows us to control the precision with which we want to store pi
.
from pandas import DataFrame
from os import path
if 'transformer' not in globals():
from mage_ai.data_preparation.decorators import transformer
@transformer
def convert_to_radians(df: DataFrame, **kwargs) -> DataFrame:
pi = kwargs.get('pi')
df['angle_radians'] = df['angle'] * pi / 180
return df
Suppose also want to convert the velocity from meters per second to kilometers per hour by multiplying by the constant 3.6. Using the runtime variable conversion_factor
(again passed in through **kwargs
), the vertical velocity in kilometers per hour can be computed:
from pandas import DataFrame
from os import path
if 'transformer' not in globals():
from mage_ai.data_preparation.decorators import transformer
@transformer
def convert_velocity(df: DataFrame, **kwargs) -> DataFrame:
conversion_factor = kwargs.get('conversion_factor')
df['converted_vertical_velocity'] = df['vertical_velocity'] * conversion_factor
return df
Now that your pipeline is complete, you can run your pipeline using mage_ai.run()
. Specify values for runtime variables as keyword arguments to this function:
mage_ai.run('model_rocket_data_ingestion', 'repos/default_repo', pi=3.1415, conversion_factor=3.6)
Suppose after creating your pipeline, you instead want to store your velocity in miles per hour instead of kilometers per hour. As the conversion factor is a runtime variable, you don't have to edit your pipeline - you just have to change the conversion factor which your pipeline is run with!
mage_ai.run('model_rocket_data_ingestion', 'repos/default_repo', pi=3.1415, conversion_factor=2.24)