Mage provides data loading clients that simplify loading and exporting data in your pipelines, allowing you to spend more time analyzing and transforming your data for ML tasks. Currently, Mage includes clients for the following data sources:
- AWS Redshift
- AWS S3
- Azure Blob Storage
- Filesystem
- Google BigQuery
- Google Cloud Storage
- PostgreSQL Database
- Snowflake
Mage's data loading clients fall into two categories:
- File-Loading Clients - import/export data between files and your pipeline. Includes both local filesystem storage and external filesystem storage (like AWS S3)
- Database Clients - imports data from an external database into your pipeline and exports data frames back into that database. These database clients include the
execute
method which execute your queries in the connected database. The Google BigQuery client follows this structure.- A subcategory of database clients are connection-based clients, which wrap a connection to the database. This connection is used to execute transactions (sets of queries) on the database, which are either committed (saved to database) or rolled-back (deleted). Clients for PostgreSQL, Redshift, and Snowflake follow this structure.
While traditional Pandas IO procedures can be utilized to load files into your pipeline, Mage provides the mage_ai.io.file.FileIO
client as a convenience wrapper.
The following code uses the load
function of the FileIO
client to load the Titanic survival dataset from a CSV file into a Pandas DataFrame for use in your pipeline. All data loaders can be initialized with the verbose = True
parameter, which will print the current action the data loading client is performing. This parameter defaults to False
.
loader = FileIO(verbose=True)
df = loader.load(
'https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv'
)
Then the export
function is used to save this data frame back to file, this time in JSON format
loader.export(df, './titanic_survival.json', orient='records')
Any formatting settings (such as specifying the data frame orient) can be passed as keyword arguments to load
and export
. These arguments are passed to Pandas IO procedures for the requested file format, enabling fine-grained control over how your data is loaded and exported.
As the data loader was constructed with the verbose parameter set to True
, the above operations would print the following output describing the actions of the data loader.
FileIO initialized
├─ Loading data frame from 'https://raw.githubusercontent.com/datasciencedojodatasets/master/titanic.csv'...DONE
└─ Exporting data frame to './titanic_survival.json'...DONE
See the FileIO API to learn more about loading data from filesystem.
Loading data from a Snowflake data warehouse is made easy using the mage_ai.io.snowflake.Snowflake
data loading client. In order to authenticate access to a Snowflake warehouse, the client requires the associated Snowflake credentials:
- Account Username
- Account Password
- Account ID (including your region) (Ex: example.us-west-2)
These parameters can be manually specified as input to the data loading client (see Snowflake API). However we recommend using a Configuration Loader to handle loading these secrets. If you used mage init
to create your project repository, you can store these values in your io_config.yaml
file and use mage_ai.io.config.ConfigFileLoader
to construct the data loader client.
An example io_config.yaml
file in this instance would be:
default:
SNOWFLAKE_USER: my_username
SNOWFLAKE_PASSWORD: my_password
SNOWFLAKE_ACCOUNT_ID: example.us-west-2
with which a Snowflake data loading client can be constructed as:
config = ConfigFileLoader('io_config.yaml', 'default')
loader = Snowflake.with_config(config, verbose=True)
To learn more about using configuration settings, see Configuration Settings.
Then the following code uses the functions:
execute()
- executes an arbitrary query on your data warehouse. In this case, the warehouse, database, and schema to use are selected.load()
- loads the results of aSELECT
query into a Pandas DataFrame.export()
- stores data frame as a table in your data warehouse. If the table exists, then the data is appended by default (and can be configured with other behavior, see Snowflake API). If the table doesn't exist, then the table is created with the given schema name and table name (the table data types are inferred from the Python data type).
with loader:
loader.execute('USE WAREHOUSE my_warehouse;')
loader.execute('USE DATABASE my_database;')
loader.execute('USE SCHEMA my_schema;')
df = loader.load('SELECT * FROM test_table;')
loader.export(df, 'my_schema', 'test_table')
The loader
object manages a direct connection to your Snowflake data warehouse, so it is important to make sure that your connection is closed once your operations are completed. You can manually use loader.open()
and loader.close()
to open and close the connection to your data warehouse or automatically manage the connection with a context manager.
To learn more about loading data from Snowflake, see the Snowflake API for more details on member functions and usage.
This section covers the API for using the following data loaders.
mage_ai.io.redshift.Redshift
Handles data transfer between a Redshift cluster and the Mage app. Mage uses temporary credentials to authenticate access to a Redshift cluster. There are two ways to specify these credentials:
-
Pre-generate temporary credentials and specify them in the configuration settings. Add the following keys to the configuration settings for
Redshift
to use the temporary credentials:REDSHIFT_DBNAME: Name of Redshift database to connect to REDSHIFT_HOST: Redshift Cluster hostname REDSHIFT_PORT: Redshift Cluster port. Optional, defaults to 5439 REDSHIFT_TEMP_CRED_USER: Redshift temp credentials username REDSHIFT_TEMP_CRED_PASSWORD: Redshift temp credentials password
-
Provide an IAM Profile to automatically generate temporary credentials for connection. The IAM profile is read from
~/.aws/
and is used with theGetClusterCredentials
endpoint to generate temporary credentials. Add the following keys to the configuration settings forRedshift
to generate temporary credentialsREDSHIFT_DBNAME: Name of Redshift database to connect toName of Redshift database to connect to REDSHIFT_DBUSER: Redshift database user to generate credentials for REDSHIFT_CLUSTER_ID: Redshift cluster ID REDSHIFT_IAM_PROFILE: Name of the IAM profile to generate temp credentials with `
If an IAM profile is not setup using
aws configure
, manually specify the AWS credentials in the configuration settings as well.AWS_ACCESS_KEY_ID: AWS Access Key ID credential AWS_SECRET_ACCESS_KEY: AWS Secret Access Key credential AWS_SESSION_TOKEN: AWS Session Token (used to generate temp DB credentials) AWS_REGION: AWS Region `
__init__(**kwargs)
-
Args
verbose
: Enables verbose output. Defaults toFalse
.
See other keyword arguments in Redshift's Python connector docs.
- conn (
redshift_connector.Connection
) - the underlying Redshift Connection object.
with_config - with_config(config: BaseConfigLoader, **kwargs) -> Redshift
Initializes Redshift client from configuration settings.
- Args:
config (BaseConfigLoader)
: Configuration loader object.**kwargs
: Additional parameters passed to the loader constructor.
- Returns: (
Redshift
) Redshift data loading client constructed using this method
with_temporary_credentials - with_temporary_credentials(database: str, host: str, user: str, password: str, port: int = 5439, **kwargs) -> Redshift
Creates a Redshift data loader with temporary database credentials.
- Args:
database (str)
: Name of the database to connect to.host (str)
: The hostname of the Redshift cluster which the database belongs to.user (str)
: Temporary credentials username for use in authentication.password (str)
: Temporary credentials password for use in authentication.port (int, optional)
: Port number of the Redshift cluster. Defaults to 5439.**kwargs
: Additional parameters passed to the loader constructor.
- Returns: (
Redshift
) Redshift data loading client constructed using this method
with_iam - with_iam(cluster_identifier: str, database: str, db_user: str, profile: str, **kwargs) -> Redshift
Creates a Redshift data loader using an IAM profile from ~/.aws
.
The IAM Profile settings can also be manually specified as keyword arguments to this constructor, but is not recommended. If credentials are manually specified, the region of the Redshift cluster must also be specified.
- Args:
cluster_identifier (str)
: Identifier of the cluster to connect to.database (str)
: The database to connect to within the specified cluster.db_user (str)
: Database usernameprofile (str, optional)
: The profile to use from stored credentials file. Defaults to 'default'.**kwargs
: Additional parameters passed to the loader constructor
- Returns: (
Redshift
) Redshift data loading client constructed using this method
close - close()
Closes connection to the Redshift cluster specified in the loader configuration.
commit - commit()
Saves all changes made to the database since the last transaction.
execute - execute(query_string: str, **kwargs) -> None
Sends query to the connected Redshift cluster. Any changes made to the database will not be saved unless commit()
is called afterward.
- Args:
query_string (str)
: The query to execute on the Redshift cluster.**kwargs
: Additional parameters to pass to the query. Seeredshift-connector
docs for configuring query parameters.
export - export(df: DataFrame, table_name: str) -> None
Exports a Pandas data frame to a Redshift cluster under the specified table. The changes made to the database will not be saved unless commit()
is called afterward.
- Args:
df (DataFrame)
: Data frame to export to database in Redshift cluster.table_name (str)
: Name of the table to export the data to. Table must already exist.
load - load(query_string: str, limit: int, *args, **kwargs) -> DataFrame
Loads data from the results of a query into a Pandas data frame. This will fail if the query returns no data from the database.
This function will load at maximum 100,000 rows of data (this limit is configurable). To operate on more data, consider performing data transformations in warehouse using execute
.
-
Args:
query_string (str)
: Query to fetch a table or subset of a table.limit (int, Optional)
: The number of rows to limit the loaded data frame to. Defaults to 100,000.*args, **kwargs
: Additional parameters to send to query, including parameters for use with format strings. Seeredshift-connector
docs for configuring query parameters.
-
Returns: (
DataFrame
) Data frame containing the queried data.
open - open()
Opens a connection to the Redshift cluster specified in the loader configuration.
rollback - rollback()
Rolls back (deletes) all database changes made since the last transaction.
sample - sample(schema: str, table: str, size: int, **kwargs) -> DataFrame
Sample data from a table in the selected database in the Redshift cluster. Sample is not guaranteed to be random.
-
Args:
schema (str)
: The schema to select the table from.size (int)
: The number of rows to sample. Defaults to 100,000.table (str)
: The table to sample from in the connected database.
-
Returns: (
DataFrame
) Sampled data from the table.
mage_ai.io.s3.S3
Handles data transfer between an S3 bucket and the Mage app. The S3
client supports importing and exporting with the following file formats:
- CSV
- JSON
- Parquet
- HDF5
If an IAM profile is not set up using aws configure
, then AWS credentials for accesses the S3 bucket can be manually specified through either Mage's configuration loader system or through keyword arguments in the constructor (see constructor).
If using configuration settings, specify the following keys:
AWS_ACCESS_KEY_ID: AWS Access Key ID credential
AWS_SECRET_ACCESS_KEY: AWS Secret Access Key credential
AWS_REGION: AWS Region
__init__(self, verbose: bool)
Initializes the S3 data loading client.
- Args
verbose (bool)
: Enables verbose output printing. Defaults to False.- If IAM profile is not set up using
aws configure
and Mage's configuration loader is not used, then specify your AWS credentials through the following keyword arguments:aws_access_key_id (str)
: AWS Access Key ID credentialaws_secret_access_key (str)
: AWS Secret Access Key credentialaws_region (str)
: Region associated with AWS IAM profile
with_config - with_config(config: BaseConfigLoader, **kwargs) -> S3
Creates S3 data loading client from configuration settings.
- Args:
config (BaseConfigLoader)
: Configuration loader object.**kwargs
: Additional parameters passed to the loader constructor
- Returns: (
S3
) The constructed dataloader using this method
export - export(df: DataFrame, bucket_name: str, object_key: str, format: FileFormat | str = None, **kwargs) -> None:
Exports data frame to an S3 bucket.
If the format is HDF5, the default key under which the data frame is stored is the stem of the filename. For example, if the file to write the data frame to is 'storage/my_data_frame.hdf5', the key would be 'my_data_frame'. This can be overridden using the key
keyword argument.
-
Args:
-
df (DataFrame)
: Data frame to export. -
bucket_name (str)
: Name of the bucket to export data frame to. -
object_key (str)
: Object key in S3 bucket to export data frame to. -
format (FileFormat | str, Optional)
: Format of the file to export data frame to. Defaults toNone
, in which case the format is inferred. -
**kwargs
- Additional keyword arguments to pass to the file writer. See possible arguments by file formats at:Format Pandas Writer CSV DataFrame.to_csv JSON DataFrame.to_json Parquet DataFrame.to_parquet HDF5 DataFrame.to_hdf
-
load - load(bucket_name: str, object_key: str, format: FileFormat | str = None, limit: int = QUERY_ROW_LIMIT **kwargs) -> DataFrame
Loads data from object in S3 bucket into a Pandas data frame. This function will load at maximum 100,000 rows of data from the specified file (this limit is configurable).
-
Args:
-
bucket_name (str)
: Name of the bucket to load data from. -
object_key (str)
: Key of the object in S3 bucket to load data from. -
format (FileFormat | str, Optional)
: Format of the file to load data frame from. Defaults to None, in which case the format is inferred. -
limit (int, optional)
: The number of rows to limit the loaded data frame to. Defaults to 100,000. -
**kwargs
- Additional keyword arguments to pass to the file writer. See possible arguments by file formats at:Format Pandas Reader CSV read_csv JSON read_json Parquet read_parquet HDF5 read_hdf
-
-
Returns: (
DataFrame
) Data frame object loaded from data in the specified file.
mage_ai.io.file.FileIO
Handles data transfer between the filesystem and the Mage app. The FileIO
client currently supports importing and exporting with the following file formats:
- CSV
- JSON
- Parquet
- HDF5
__init__(self, verbose: bool)
Initializes the FileIO data loading client.
- Args
verbose (bool)
: Enables verbose output printing. Defaults to False.
export - export(df: DataFrame, filepath: os.PathLike, format: FileFormat | str = None, **kwargs) -> None:
Exports the input data frame to the file specified.
If the format is HDF5, the default key under which the data frame is stored is the stem of the filename. For example, if the file to write the data frame to is 'storage/my_data_frame.hdf5', the key would be 'my_data_frame'. This can be overridden using the key
keyword argument.
-
Args:
-
df (DataFrame)
: Data frame to export. -
filepath (os.PathLike)
: Filepath to export data frame to. -
format (FileFormat | str, Optional)
: Format of the file to export data frame to. Defaults to None, in which case the format is inferred. -
**kwargs
- Additional keyword arguments to pass to the file writer. See possible arguments by file formats at:Format Pandas Writer CSV DataFrame.to_csv JSON DataFrame.to_json Parquet DataFrame.to_parquet HDF5 DataFrame.to_hdf
-
load - load(filepath: os.PathLike, format: FileFormat | str = None, limit: int = QUERY_ROW_LIMIT **kwargs) -> DataFrame
Loads the data frame from the filepath specified. This function will load at maximum 100,000 rows of data from the specified file (this limit is configurable).
-
Args:
-
filepath (os.PathLike)
: Filepath to load data frame from. -
format (FileFormat | str, Optional)
: Format of the file to load data frame from. Defaults to None, in which case the format is inferred. -
limit (int, optional)
: The number of rows to limit the loaded data frame to. Defaults to 100,000. -
**kwargs
- Additional keyword arguments to pass to the file writer. See possible arguments by file formats at:Format Pandas Reader CSV read_csv JSON read_json Parquet read_parquet HDF5 read_hdf
-
-
Returns: (
DataFrame
) Data frame object loaded from data in the specified file.
WIP
WIP
mage_ai.io.bigquery.BigQuery
Handles data transfer between a BigQuery data warehouse and the Mage app.
Authentication with a Google BigQuery warehouse requires specifying the service account key for the service account that has access to the BigQuery warehouse. There are four ways to provide this service key:
- Define the
GOOGLE_APPLICATION_CREDENTIALS
environment variable holding the filepath to your service account key - Define the
GOOGLE_SERVICE_ACC_KEY_FILEPATH
key with your configuration loader or thepath_to_credentials
keyword argument with the client constructor holding the filepath to your service account key - Define the
GOOGLE_SERVICE_ACC_KEY
key with your configuration loader or thecredentials_mapping
keyword argument with the client constructor holding a mapping sharing the same contents as your service key- if using a configuration file, be careful to wrap your service key values in quotes so the YAML parser reads the settings correctly
- Manually pass the
google.oauth2.service_account.Credentials
object with the keyword argumentcredentials
__init__(self, **kwargs)
Initializes the BigQuery data loading client.
-
Args
verbose (bool)
: Enables verbose output printing. Defaults toFalse
.credentials_mapping (Mapping[str, str])
- Mapping object corresponding to your service account key. See instructions above on when to use this keyword argumentpath_to_credentials (str)
- Filepath to service account key. See instructions above on when to use this keyword argument.
All other keyword arguments can be found in the Google BigQuery Python Client docs
with_config - with_config(config: BaseConfigLoader, **kwargs) -> BigQuery
Creates BigQuery data loading client from configuration settings.
- Args:
config (BaseConfigLoader)
: Configuration loader object.**kwargs
: Additional parameters passed to the loader constructor
- Returns: (
BigQuery
) BigQuery data loading client constructed using this method
with_credentials_file - with_credentials_file(cls, path_to_credentials: str, **kwargs) -> BigQuery
Constructs BigQuery data loader using the file containing the service account key.
- Args:
path_to_credentials (str)
: Path to the credentials file.**kwargs
: Additional parameters to pass to BigQuery client constructor.
- Returns: (
BigQuery
) BigQuery data loading client constructed using this method
with_credentials_object - with_credentials_object(cls, credentials: Mapping[str, str], **kwargs) -> BigQuery
Constructs BigQuery data loader using manually specified service account key credentials.
- Args:
credentials (Mapping[str, str])
: Mapping containing same key-value pairs as a service account key.**kwargs
: Additional parameters to pass to BigQuery client constructor.
- Returns: (
BigQuery
) BigQuery data loading client constructed using this method
execute - execute(query_string: str, **kwargs) -> None
Sends query to the connected BigQuery warehouse.
- Args:
query_string (str)
: Query to execute on the BigQuery warehouse.**kwargs
: Additional arguments to pass to query, such as query configurations. See Client.query() docs for additional arguments.
export - export(df: DataFrame, table_name: str, database: str, schema: str, if_exists: str, **kwargs) -> None
Exports a data frame to a Google BigQuery warehouse. If table doesn't exist, the table is automatically created.
- Args:
df (DataFrame)
: Data frame to exporttable_id (str)
: ID of the table to export the data frame to. If of the format"your-project.your_dataset.your_table_name"
. If this table exists, the table schema must match the data frame schema. If this table doesn't exist, the table schema is automatically inferred.if_exists (str)
: Specifies export policy if table exists. Either -'fail'
: throw an error. -'replace'
: drops existing table and creates new table of same name. -'append'
: appends data frame to existing table. In this case the schema must match the original table. Defaults to'replace'
. Ifwrite_disposition
is specified as a keyword argument, this parameter is ignored (as both define the same functionality).**configuration_params
: Configuration parameters for export job. See valid configuration parameters at LoadJobConfig docs.
load - load(query_string: str, limit: int, *args, **kwargs) -> DataFrame
Loads data from the results of a query into a Pandas data frame. This will fail if the query returns no data from the database.
When a select query is provided, this function will load at maximum 100,000 rows of data (this limit is configurable). To operate on more data, consider performing data transformations in warehouse.
- Args:
query_string (str)
: Query to fetch a table or subset of a table.limit (int, Optional)
: The number of rows to limit the loaded data frame to. Defaults to 100,000.**kwargs
: Additional parameters to pass to the query. See Google BigQuery Python client docs for additional arguments.
sample - sample(schema: str, table: str, size: int, **kwargs) -> DataFrame
Sample data from a table in the BigQuery warehouse. Sample is not guaranteed to be random.
-
Args:
schema (str)
: The schema to select the table from.size (int)
: The number of rows to sample. Defaults to 100,000table (str)
: The table to sample from in the connected database.
-
Returns: (
DataFrame
) Sampled data from the table.
mage_ai.io.postgres.Postgres
Handles data transfer between a PostgreSQL database and the Mage app. The Postgres
client utilizes the following keys to connect the PostgreSQL database.
POSTGRES_DBNAME: PostgreSQL database name
POSTGRES_USER: PostgreSQL database login username
POSTGRES_PASSWORD: PostgreSQL database login password
POSTGRES_HOST: PostgreSQL database hostname
POSTGRES_PORT: PostgreSQL database port
__init__(self, **kwargs)
Initializes the Postgres data loading client.
- Args:
dbname (str)
: The name of the database to connect to.user (str)
: The user with which to connect to the database with.password (str)
: The login password for the user.host (str)
: Host address for database.port (str)
: Port on which the database is running.verbose (bool)
: Enables verbose output printing. Defaults toFalse
.**kwargs
: Additional settings for creating psycopg2 connection
- conn (
psycopg2.connection.Connection
) - underlying psycopg2 Connection object
with_config - with_config(config: BaseConfigLoader, **kwargs) -> Postgres
Creates Postgres data loading client from configuration settings.
- Args:
config (BaseConfigLoader)
: Configuration loader object.**kwargs
: Additional parameters passed to the loader constructor
- Returns: (
Postgres
) The constructed dataloader using this method
close - close()
Closes connection to PostgreSQL database.
commit - commit()
Saves all changes made to the database since the previous transaction.
execute - execute(query_string: str, **kwargs) -> None
Sends query to the connected PostgreSQL database. Any changes made to the database will not be saved unless commit()
is called afterward.
- Args:
query_string (str)
: The query to execute on the PostgreSQL database.**kwargs
: Additional parameters to pass to the query. Seepsycopg2
docs for configuring query parameters.
export - export(df: DataFrame, table_name: str, database: str, schema: str, if_exists: str, index: bool, **kwargs) -> None
Exports data frame to the PostgreSQL database from a Pandas data frame. If table doesn't exist, the table is automatically created. If the schema doesn't exist, the schema is also created.
Any changes made to the database will not be saved unless commit()
is called afterward.
-
Args:
-
df (DataFrame)
: Data frame to export to the PostgreSQL database. -
table_name (str)
: Name of the table to export data to (excluding database and schema). -
database (str)
: Name of the database in which the table is located. -
schema (str)
: Name of the schema in which the table is located. -
if_exists (ExportWritePolicy)
: Specifies export policy if table exists. Either'fail'
: throw an error.'replace'
: drops existing table and creates new table of same name.'append'
: appends data frame to existing table.
Defaults to
'replace'
. -
index (bool)
: IfTrue
, the data frame index is also exported alongside the table. Defaults toFalse
. -
**kwargs
: Additional arguments to pass to writer.
-
load - load(query_string: str, limit: int, *args, **kwargs) -> DataFrame
Loads data from the results of a query into a Pandas data frame. This will fail if the query returns no data from the database.
This function will load at maximum 100,000 rows of data (this limit is configurable). To operate on more data, consider performing data transformations in warehouse.
- Args:
query_string (str)
: Query to fetch a table or subset of a table.limit (int, Optional)
: The number of rows to limit the loaded data frame to. Defaults to 100,000.**kwargs
: Additional parameters to pass to the query. Seepsycopg2
docs for configuring query parameters.
- Returns: (
DataFrame
) Data frame containing the queried data.
open - open()
Opens a connection to PostgreSQL database.
rollback - rollback()
Rolls back (deletes) all database changes made since the last transaction.
sample - sample(schema: str, table: str, size: int, **kwargs) -> DataFrame
Sample data from a table in the PostgreSQL database. Sample is not guaranteed to be random.
-
Args:
schema (str)
: The schema to select the table from.size (int)
: The number of rows to sample. Defaults to 100,000.table (str)
: The table to sample from in the connected database.
-
Returns: (
DataFrame
) Sampled data from the table.
mage_ai.io.snowflake.Snowflake
Handles data transfer between a Snowflake data warehouse and the Mage app. The Snowflake
client utilizes the following keys to authenticate access and connect to Snowflake servers.
SNOWFLAKE_USER: Snowflake username
SNOWFLAKE_PASSWORD: Snowflake password
SNOWFLAKE_ACCOUNT: Snowflake account ID (including region, excluding "snowflake-computing.com")
SNOWFLAKE_DEFAULT_WH: Default warehouse to use. Optional, if unspecified warehouse not chosen.
SNOWFLAKE_DEFAULT_DB: Default database to use. Optional, if unspecified database not chosen
SNOWFLAKE_DEFAULT_SCHEMA: Default schema to use. Optional, if unspecified schema not chosen
__init__(self, **kwargs)
Initializes settings for connecting to Snowflake data warehouse. The following arguments must be provided to the connector, all other arguments are optional.
Required Arguments:
user (str)
: Username for the Snowflake user.password (str)
: Login Password for the user.account (str)
: Snowflake account identifier (including region, excludingsnowflake-computing.com
suffix).
Optional Arguments:
verbose (bool)
: Specify whether to print verbose output.database (str)
: Name of the default database to use. If unspecified no database is selected on login.schema (str)
: Name of the default schema to use. If unspecified no schema is selected on login.warehouse (str)
: Name of the default warehouse to use. If unspecified no warehouse is selected on login.
- conn (
snowflake.connector.Connection
) - underlying Snowflake Connection object
with_config - with_config(config: BaseConfigLoader, **kwargs) -> Snowflake
Creates Snowflake data loading client from configuration settings.
- Args:
config (BaseConfigLoader)
: Configuration loader object.verbose (bool)
: Enables verbose output printing. Defaults to False.**kwargs
: Additional parameters passed to the loader constructor
- Returns: (
Snowflake
) The constructed dataloader using this method
close - close()
Closes connection to Snowflake server.
commit - commit()
Saves all changes made to the warehouse since the previous transaction.
execute - execute(query_string: str, **kwargs) -> None
Sends query to the connected Snowflake warehouse. Any changes made to the database will not be saved unless commit()
is called afterward.
- Args:
query_string (str)
: The query to execute on the Snowflake warehouse.**kwargs
: Additional parameters to pass to the query. See Snowflake Connector Docs for additional parameters.
export - export(df: DataFrame, table_name: str, database: str, schema: str, if_exists: str, **kwargs) -> None
Exports a Pandas data frame to a Snowflake warehouse based on the table name. If table doesn't exist, the table is automatically created.
Any changes made to the database will not be saved unless commit()
is called afterward.
-
Args:
-
df (DataFrame)
: Data frame to export to a Snowflake warehouse. -
table_name (str)
: Name of the table to export data to (excluding database and schema). -
database (str)
: Name of the database in which the table is located. -
schema (str)
: Name of the schema in which the table is located. -
if_exists (str, optional)
: Specifies export policy if table exists. Either'fail'
: throw an error.'replace'
: drops existing table and creates new table of same name.'append'
: appends data frame to existing table.
Defaults to
'append'
. -
**kwargs
: Additional arguments to pass to writer
-
load - load(query_string: str, limit: int, *args, **kwargs) -> DataFrame
Loads data from Snowflake into a Pandas data frame based on the query given. This will fail unless a SELECT
query is provided.
This function will load at maximum 100,000 rows of data (this limit is configurable). To operate on more data, consider performing data transformations in warehouse using execute
.
- Args:
query_string (str)
: Query to fetch a table or subset of a table.limit (int, Optional)
: The number of rows to limit the loaded data frame to. Defaults to 100,000.*args, **kwargs
: Additional parameters to pass to the query. See Snowflake Connector Docs for additional parameters.
- Returns: (
DataFrame
) Data frame containing the queried data
open - open()
Opens a connection to Snowflake servers.
rollback - rollback()
Rolls back (deletes) all database changes made since the last transaction.
sample - sample(schema: str, table: str, size: int, **kwargs) -> DataFrame
Sample data from a table in the Snowflake warehouse. Sample is not guaranteed to be random.
-
Args:
schema (str)
: The schema to select the table from.size (int)
: The number of rows to sample. Defaults to 100,000table (str)
: The table to sample from in the connected database.
-
Returns: (
DataFrame
) Sampled data from the data frame.
Connections to third-party data storage require you to specify confidential information such as login information or access keys. While you can manually specify this information code while constructing data loading clients, it is recommended to not store the secrets directly in code.
Instead, Mage provides configuration loaders which allow data loading clients to use your secrets without explicitly writing them in code.
Currently, the following sources (and their corresponding configuration loader) can be used to load configuration settings:
- Configuration File -
ConfigFileLoader
- Environment Variables -
EnvironmentVariableLoader
- AWS Secrets Manager -
AWSSecretLoader
s For example, the code below constructs a Redshift data loading client using secrets stored in AWS Secrets Manager
from mage_ai.io.config import AWSSecretLoader
from mage_ai.io.redshift import Redshift
config = AWSSecretLoader()
loader = Redshift.from_config(config)
The following are the set of allowed key names that you must name your secrets with in order for Mage's configuration loaders to recognize your secrets. In code you can refer to these keys by their string name or using the mage_ai.io.config.ConfigKey
enum. Not all keys need be specified at once - only use the keys related to the services you utilize.
Key Name | Service | Client Constructor Parameter | Description | Notes |
---|---|---|---|---|
AWS_ACCESS_KEY_ID | AWS General | - | AWS Access Key ID credential | Used by Redshift and S3 |
AWS_SECRET_ACCESS_KEY | AWS General | - | AWS Secret Access Key credential | Used by Redshift and S3 |
AWS_SESSION_TOKEN | AWS General | - | AWS Session Token (used to generate temporary DB credentials) | Used by Redshift |
AWS_REGION | AWS General | - | AWS Region | Used by Redshift and S3 |
REDSHIFT_DBNAME | AWS Redshift | database | Name of Redshift database to connect to | |
REDSHIFT_HOST | AWS Redshift | host | Redshift Cluster hostname | Use with temporary credentials |
REDSHIFT_PORT | AWS Redshift | port | Redshift Cluster port. Optional, defaults to 5439. | Use with temporary credentials |
REDSHIFT_TEMP_CRED_USER | AWS Redshift | user | Redshift temporary credentials username. | Use with temporary credentials |
REDSHIFT_TEMP_CRED_PASSWORD | AWS Redshift | password | Redshift temporary credentials password. | Use with temporary credentials |
REDSHIFT_DBUSER | AWS Redshift | db_user | Redshift database user to generate credentials for. | Use to generate temporary credentials |
REDSHIFT_CLUSTER_ID | AWS Redshift | cluster_identifier | Redshift cluster ID | Use to generate temporary credentials |
REDSHIFT_IAM_PROFILE | AWS Redshift | profile | Name of the IAM profile to generate temporary credentials with | Use to generate temporary credentials |
POSTGRES_DBNAME | PostgreSQL | dbname | Database name | |
POSTGRES_USER | PostgreSQL | user | Database login username | |
POSTGRES_PASSWORD | PostgreSQL | password | Database login password | |
POSTGRES_HOST | PostgreSQL | host | Database hostname | |
POSTGRES_PORT | PostgreSQL | port | PostgreSQL database port | |
SNOWFLAKE_USER | Snowflake | user | Snowflake username | |
SNOWFLAKE_PASS | Snowflake | password | Snowflake password | |
SNOWFLAKE_ACCOUNT | Snowflake | account | Snowflake account ID (including region) | |
SNOWFLAKE_DEFAULT_DB | Snowflake | database | Default database to use. Optional, no database chosen if unspecified. | |
SNOWFLAKE_DEFAULT_SCHEMA | Snowflake | schema | Default schema to use. Optional, no schema chosen if unspecified. | |
SNOWFLAKE_DEFAULT_WH | Snowflake | warehouse | Default warehouse to use. Optional, no warehouse chosen if unspecified. | |
GOOGLE_SERVICE_ACC_KEY | Google BigQuery | credentials_mapping | Service account key | |
GOOGLE_SERVICE_ACC_KEY_FILEPATH | Google BigQuery | path_to_credentials | Path to service account key |
This section contains the exact APIs and more detailed information on the configuration loaders. Every configuration loader has two functions:
-
contains
- checks if the configuration source contains the requested key. Commonly, thein
operation is used to check for setting existence (but is not always identical ascontains
can accept multiple parameters while thein
keyword only accepts the key).if config.contains(ConfigKey.POSTGRES_PORT): ... # alternatively if ConfigKey.POSTGRES_PORT in config: ...
-
get
- gets the configuration setting associated with the given key. If the key doesn't exist, returns None. Commonly, the data model overload__getitem__
is used to fetch a configuration setting (but is not always identical asget
can accept multiple parameters while__getitem__
does not).user = config.get(ConfigKey.REDSHIFT_DBUSER) # alternatively user = config[ConfigKey.REDSHIFT_DBUSER]
These functions are shared among all configuration loaders, but depending on the source some function signatures may differ.
Loads configuration settings from a configuration file.
Example:
from mage_ai.io.config import ConfigKey, ConfigFileLoader
config = ConfigFileLoader('path/to/my/config.yaml', 'my_profile')
postgres_db = config[ConfigKey.POSTGRES_DBNAME]
Constructor: __init__(filepath: os.PathLike, profile: str)
Initializes IO Configuration loader. Input configuration file can have two formats:
- Standard: contains a subset of the configuration keys specified in
ConfigKey
. This is the default and recommended format. Below is an example configuration file using this format.The above configuration file has a single profile namedversion: 0.1.0 default: AWS_ACCESS_KEY_ID: AWS Access Key ID credential AWS_SECRET_ACCESS_KEY: AWS Secret Access Key credential AWS_REGION: AWS Region REDSHIFT_DBNAME: Name of Redshift database to connect to REDSHIFT_HOST: Redshift Cluster hostname REDSHIFT_PORT: Redshift Cluster port. Optional, defaults to 5439 REDSHIFT_TEMP_CRED_USER: Redshift temp credentials username REDSHIFT_TEMP_CRED_PASSWORD: Redshift temp credentials password
'default'
. Each profile organizes a set of keys to use (for example, distinguishing production keys versus development keys). A configuration file can have multiple profiles. - Verbose: Instead of configuration keys, each profile stores an object of settings associated with
each data migration client. This format was used in previous versions of this tool, and exists
for backwards compatibility. Below is an example configuration file using this format.
version: 0.0.0 default: AWS: Redshift: database: Name of Redshift database to connect to host: Redshift Cluster hostname port: Redshift Cluster port. Optional, defaults to 5439 user: Redshift temp credentials username password: Redshift temp credentials password access_key_id: AWS Access Key ID credential secret_access_key: AWS Secret Access Key credential region: AWS Region
Use handlebars and env_var
syntax to reference environment variables in either configuration file format.
version: 0.1.0
default:
GOOGLE_SERVICE_ACC_KEY_FILEPATH: "{{ env_var('GOOGLE_APPLICATION_CREDENTIALS') }}"
Args:
filepath (os.PathLike, optional)
: Path to IO configuration file. Defaults to'[repo_path]/io_config.yaml'
profile (str, optional)
: Profile to load configuration settings from. Defaults to'default'
.
Methods
contains - contains(self, key: ConfigKey | str) -> Any
Checks if the configuration setting stored under key
is contained.
- Args:
key (str)
: Name of the configuration setting to check.
- Returns (
bool
) ReturnsTrue
if configuration setting exists, otherwise returnsFalse
.
get - get(self, key: ConfigKey | str) -> Any
Loads the configuration setting stored under key
.
- Args:
key (str)
: Key name of the configuration setting to load
- Returns: (
Any
) Configuration setting corresponding to the given key
Loads configuration settings from environment variables in your current environment.
Example:
from mage_ai.io.config import ConfigKey, EnvironmentVariableLoader
config = EnvironmentVariableLoader()
postgres_db = config[ConfigKey.POSTGRES_DBNAME]
Constructor : __init__(self)
- no parameters for construction.
Methods:
contains - contains(env_var: ConfigKey | str) -> bool
Checks if the environment variable given by env_var
exists.
-
Args:
key (ConfigKey | str)
: Name of the configuration setting to check existence of.
-
Returns (
bool
) ReturnsTrue
if configuration setting exists, otherwise returnsFalse
.
get - get(env_var: ConfigKey | str) -> Any
Loads the config setting stored under the environment variable env_var
.
-
Args:
env_var (str)
: Name of the environment variable to load configuration setting from
-
Returns: (
Any
) The configuration setting stored underenv_var
Loads secrets from AWS Secrets Manager. To authenticate access to AWS Secrets Manager, either
- Configure your AWS profile using the AWS CLI
aws configure
- Manually specify your AWS Credentials when constructing the configuration loader
python config = AWSSecretLoader( aws_access_key_id = 'your access key id', aws_secret_access_key = 'your secret key', aws_region = 'your region' )
Example:
from mage_ai.io.config import ConfigKey, AWSSecretLoader
config = AWSSecretLoader()
postgres_db = config[ConfigKey.POSTGRES_DBNAME]
# with finer control on version
postgres_db = config.get(ConfigKey.POSTGRES_DBNAME, version_id='my_version_id')
Constructor : __init__(self, **kwargs)
:
- Keyword Arguments:
aws_access_key_id (str, Optional)
: AWS access key ID credential.aws_secret_access_key (str, Optional)
: AWS secret access key credential.aws_region (str, Optional)
: AWS region which Secrets Manager is created in.
Methods:
contains - contains( secret_id: ConfigKey | str, version_id: str, version_stage_label : str) -> bool
Check if there is a secret with ID secret_id
contained. Can also specify the version of the
secret to check. If
- both
version_id
andversion_stage_label
are specified, both must agree on the secret version. - neither of
version_id
orversion_stage_label
are specified, any version is checked. - one of
version_id
andversion_stage_label
are specified, the associated version is checked.
When using the in
operator, comparisons to specific versions are not allowed.
- Args:
secret_id (str)
: ID of the secret to loadversion_id (str, Optional)
: ID of the version of the secret to load. Defaults toNone
.version_stage_label (str, Optional)
: Staging label of the version of the secret to load. Defaults toNone
.
- Returns: (
bool
) Returns true if secret exists, otherwise returns false.
get - get(secret_id: ConfigKey | str, version_id: str, version_stage_label : str) -> bytes | str
Loads the secret stored under secret_id
. Can also specify the version of the
secret to fetch. If
- both
version_id
andversion_stage_label
are specified, both must agree on the secret version. - neither of
version_id
orversion_stage_label
are specified, the current version is loaded. - one of
version_id
andversion_stage_label
are specified, the associated version is loaded.
When using the __getitem__
overload, comparisons to specific versions are not allowed.
-
Args:
secret_id (str)
: ID of the secret to loadversion_id (str, Optional)
: ID of the version of the secret to load. Defaults toNone
.version_stage_label (str, Optional)
: Staging label of the version of the secret to load. Defaults toNone
.
-
Returns: (
bytes | str
) The secret stored undersecret_id
in AWS secret manager. If secret is a binary value, returns abytes
object; else returns astring
object