Skip to content

Latest commit

 

History

History
143 lines (109 loc) · 5.87 KB

DataIntegrationPipeline.md

File metadata and controls

143 lines (109 loc) · 5.87 KB

How to build a data integration pipeline

Here are the high level steps to build a data integration pipeline:

  1. Add new data integration pipeline
  2. Configure source
    1. Configure schema
  3. Configure destination
  4. Run pipeline and start sync
    1. Monitoring pipeline

Demo video

Mage.Data.Integration.Demo_.Salesforce.Snowflake.mp4

Add new data integration pipeline

  1. Open Mage in your browser and click the [+ New pipeline] button.
  2. Click the dropdown menu option Data integration.

Configure source

  1. Click the dropdown menu under Select source and choose the option you want to load data from (e.g. Amplitude).
  2. Depending on the chosen source, you’ll need to enter credentials and options into the section labeled Configuration. For example, if you chose Amplitude, you’ll need to enter credentials like this:
    api_key: abc456
    secret_key: "{{ env_var('SECRET_KEY') }}"
    Best practices: you can interpolate values in the configuration using the following syntax:
    1. "{{ env_var('SECRET_KEY') }}": this will extract the value from the SECRET_KEY key in your environment variables.
    2. "{{ variables('SECRET_KEY') }}": this will extract the value from the SECRET_KEY key in your runtime variables.
  3. After you enter in all the credentials, click the button [Fetch list of streams] under the section labeled Select stream.
  4. Shortly after clicking the above button, click the new dropdown menu under the section labeled Select stream. Then, choose the stream (aka table) you want to load data from.

Configure schema

After selecting a stream (aka table), you’ll need to configure the schema.

Configuring the schema informs your pipeline on which fields to synchronize, how to determine if a record is unique, and what to do if their are conflicts (aka duplicate records).

Here are the steps you can optionally go through:

  1. Selected field(s):
    1. Check the box next to the field name to include the field in your synchronization.
    2. Uncheck the ones you don’t want to sync.
  2. Field type(s)
    1. Each field will have a default field type.
    2. Add additional field types or remove them if they don’t fit your needs.
  3. Unique field(s)
    1. On the right of the field names, there is a box you can check that will determine which field(s) need to have unique values.
    2. If the box is un-checkable, that means you cannot use that field as a unique field.
  4. Bookmark field(s)
    1. Under the column labeled Bookmark, check the box to use the field as a way to keep track of progress during synchronization.
    2. Upon every synchronization, these columns are used to pick up from where the previous synchronization left off. In addition, if a synchronization fails midway, these bookmark columns are used to track the record that was most recently successful.
  5. Replication method
    1. FULL_TABLE: synchronize the entire set of records from the source.
    2. INCREMENTAL: synchronize the records starting after the most recent bookmarked record from the previous synchronization run.
  6. Unique conflict method: choose how to handle duplicate records
    1. IGNORE: skip the new record if it’s a duplicate of an existing record.
    2. UPDATE: update the existing record with the new record’s properties.

Configure destination

  1. Click the dropdown menu under Select destination and choose the option you want to export data to (e.g. Snowflake).
  2. Depending on the chosen source, you’ll need to enter credentials and options into the section labeled Configuration. For example, if you chose Snowflake, you’ll need to enter credentials like this:
    account: ...
    database: ...
    password: "{{ env_var('PASSWORD') }}"
    schema: ...
    table: ...
    username: ...
    warehouse: ...
    Best practices: you can interpolate values in the configuration using the following syntax:
    1. "{{ env_var('PASSWORD') }}": this will extract the value from the PASSWORD key in your environment variables.
    2. "{{ variables('PASSWORD') }}": this will extract the value from the PASSWORD key in your runtime variables.

Run pipeline and start sync

Once you’re done configuring your pipeline, go back to the pipeline’s trigger page by clicking the name of your pipeline in your header.

The breadcrumbs in your header could look like this: Pipelines / pipeline name / Edit.

Once you’re on the pipeline triggers page, create a new scheduled trigger and choose the @once interval. For more schedules, read the other options here.

Monitoring pipeline

After you create a scheduled trigger, click the [Start trigger] button at the top of the page.

You’ll see a new pipeline run appear shortly on the screen.

You can click the logs for that pipeline run to view the progress of your synchronization.


Support

If you get stuck, run into problems, or just want someone to walk you through these steps, please join our Slack Slack and someone will help you ASAP.

Join us on Slack