Skip to content

Commit

Permalink
🎉 Dagster/Airbyte Integration (#1)
Browse files Browse the repository at this point in the history
A working project show casing the Pythonic configurations with Dagster to generate Airbyte artefacts
  • Loading branch information
sspaeti authored Dec 15, 2022
1 parent adced77 commit 8ab3da4
Show file tree
Hide file tree
Showing 29 changed files with 1,025 additions and 0 deletions.
11 changes: 11 additions & 0 deletions dagster/Pipfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"

[packages]

[dev-packages]

[requires]
python_version = "3.10"
68 changes: 68 additions & 0 deletions dagster/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@

# Dagster Orchestration Layer

## Getting started
setup virtual env, dependencies:
```bash
cd stargazer
pip install -e ".[dev]"
```

If you try to kick off a run immediately, it will fail, as there is no source data to ingest/transform, nor is there an active Airbyte connection. To get everything set up properly, read on.

## Set up local Postgres

We'll use a local postgres instance as the destination for our data. You can imagine the "destination" as a data warehouse (something like Snowflake).

To get a postgres instance with the required source and destination databases running on your machine, you can run:

```bash
docker pull postgres
docker run --name local-postgres -p 5432:5432 -e POSTGRES_PASSWORD=password -d postgres
```

## Set up Airbyte

Now, you'll want to get Airbyte running locally. The full instructions can be found [here](https://docs.airbyte.com/deploying-airbyte/local-deployment), but if you just want to run some commands (in a separate terminal):

```bash
git clone https://github.com/airbytehq/airbyte.git
cd airbyte
docker-compose up
```

Once you've done this, you should be able to go to http://localhost:8000, and see Airbyte's UI.

## Set up data and connections

```bash
dagster-me check --module assets_modern_data_stack.assets.stargazer:airbyte_reconciler
```

```bash
dagster-me apply --module assets_modern_data_stack.assets.stargazer:airbyte_reconciler
```
➡️ Make sure you set the environment variable `AIRBYTE_PASSWORD` on your laptop. The default password is `password`. As well as [create](https://github.com/settings/tokens) a token `AIRBYTE_PERSONAL_GITHUB_TOKEN` for fetching the stargazers from the public repositories.

## Start the UI of Dagster called Dagit

To startup the dagster UI run:
```bash
dagit
````

You'll see the assets of airbyte, dbt that are created automatically in this demo.
You can click "materialize" inside dagit to sync airbyte connections and run dbt.
## Start the BI Dashbaord with Meltano
Start it in a seperate shell and follow the docs [Metabase Readme](../../visualization/metabase/readme.md).
See a step by step guide on [Airbyte Blog](https://airbyte.com/blog/).
1 change: 1 addition & 0 deletions dagster/stargazer/assets_modern_data_stack/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .repository import assets_modern_data_stack
Empty file.
158 changes: 158 additions & 0 deletions dagster/stargazer/assets_modern_data_stack/assets/stargazer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
from dagster import asset
from dagster_airbyte import (
AirbyteManagedElementReconciler,
airbyte_resource,
AirbyteConnection,
AirbyteSyncMode,
load_assets_from_connections,
)
from dagster_airbyte.managed.generated.sources import GithubSource
from dagster_airbyte.managed.generated.destinations import (
LocalJsonDestination,
PostgresDestination,
)
from typing import List
from dagster_dbt import load_assets_from_dbt_project


from bs4 import BeautifulSoup
import os
import requests

import asyncio
import aiohttp
from ..utils.constants import DBT_PROJECT_DIR


AIRBYTE_PERSONAL_GITHUB_TOKEN = os.environ.get(
"AIRBYTE_PERSONAL_GITHUB_TOKEN", "please-set-your-token"
)
POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "please-set-your-token")


airbyte_instance = airbyte_resource.configured(
{
"host": "localhost",
"port": "8000",
"username": "airbyte",
"password": {"env": "AIRBYTE_PASSWORD"},
}
)
# two other possibilities to initialize the airbyte instance
# airbyte_assets = load_assets_from_airbyte_project(
# project_dir="../../../../airbyte/test",
# )

# airbyte_assets = with_resources(
# [load_assets_from_airbyte_project(project_dir="path/to/airbyte/project")],
# {"airbyte": airbyte_instance},
# )


async def get(url, session):
try:
# check if status_code is 200
async with session.get(url) as response:
if response.status == 200:
return url
else:
return None

except Exception as e:
print("Unable to get url {} due to {}.".format(url, e.__class__))


async def check_websites_exists(urls) -> List[str]:
async with aiohttp.ClientSession() as session:
# get url and sessionm if return is not None
tasks = [get(url, session) for url in urls]
results = await asyncio.gather(*tasks)
results = [result for result in results if result is not None]
return results
# print("Finalized all. Return is a list of len {} outputs.".format(len(results)))


def get_awesome_repo_list() -> str:

url = "https://github.com/igorbarinov/awesome-data-engineering"
html = requests.get(url)
soup = BeautifulSoup(html.text, "html.parser")
# parse all links into a list starting with github.com
links = [
link.get("href")
for link in soup.find_all("a")
if link.get("href").startswith("https://github.com")
]
# remove links that start with url
links = [
link
for link in links
if not link.startswith(url) and not link.endswith("github.com")
]
# remove last slash if there
links = [link[:-1] if link.endswith("/") else link for link in links]
# remove repos without organization
links = [link for link in links if len(link.split("/")) == 5]
# check if links are still existing in parallel to save time
existings_links = asyncio.run(check_websites_exists(links))
# remove `https://github.com/` from links
links = [link.replace("https://github.com/", "") for link in existings_links]

# due to timeout limits while airbyte is checking each repo, I limited it here to make this demo work for you
links = links[0:10]

# return links as a string with blank space as separator
return " ".join(links)


gh_awesome_de_list_source = GithubSource(
name="gh_awesome_de_list",
credentials=GithubSource.PATCredentials(AIRBYTE_PERSONAL_GITHUB_TOKEN),
start_date="2020-01-01T00:00:00Z",
repository=get_awesome_repo_list(), # "prometheus/haproxy_exporter",
page_size_for_large_streams=100,
)

postgres_destination = PostgresDestination(
name="postgres",
host="localhost",
port=5432,
database="postgres",
schema="public",
username="postgres",
password=POSTGRES_PASSWORD,
ssl_mode=PostgresDestination.Disable(),
)

stargazer_connection = AirbyteConnection(
name="fetch_stargazer",
source=gh_awesome_de_list_source,
destination=postgres_destination,
stream_config={"stargazers": AirbyteSyncMode.incremental_append_dedup()},
normalize_data=True,
)

airbyte_reconciler = AirbyteManagedElementReconciler(
airbyte=airbyte_instance,
connections=[stargazer_connection],
)

# load airbyte connection from above pythonic definitions
airbyte_assets = load_assets_from_connections(
airbyte=airbyte_instance,
connections=[stargazer_connection],
key_prefix=["postgres"],
)

# preparing assets bassed on existing dbt project
dbt_assets = load_assets_from_dbt_project(
project_dir=DBT_PROJECT_DIR, io_manager_key="db_io_manager", key_prefix="postgres"
)


# @asset(
# description="The metabase dashboard where the stargazers are visualized",
# metadata={"dashboard_url": "http://localhost:3000/dashboard/1-airbyte-sync-status"},
# )
# def metabase_dashboard(mart_gh_cumulative):
# return "test"
34 changes: 34 additions & 0 deletions dagster/stargazer/assets_modern_data_stack/db_io_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import pandas as pd

from dagster import IOManager, io_manager


class DbIOManager(IOManager):
"""Sample IOManager to handle loading the contents of tables as pandas DataFrames.
Does not handle cases where data is written to different schemas for different outputs, and
uses the name of the asset key as the table name.
"""

def __init__(self, con_string: str):
self._con = con_string

def handle_output(self, context, obj):
if isinstance(obj, pd.DataFrame):
# write df to table
obj.to_sql(name=context.asset_key.path[-1], con=self._con, if_exists="replace")
elif obj is None:
# dbt has already written the data to this table
pass
else:
raise ValueError(f"Unsupported object type {type(obj)} for DbIOManager.")

def load_input(self, context) -> pd.DataFrame:
"""Load the contents of a table as a pandas DataFrame."""
model_name = context.asset_key.path[-1]
return pd.read_sql(f"SELECT * FROM {model_name}", con=self._con)


@io_manager(config_schema={"con_string": str})
def db_io_manager(context):
return DbIOManager(context.resource_config["con_string"])
27 changes: 27 additions & 0 deletions dagster/stargazer/assets_modern_data_stack/repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from dagster_airbyte import airbyte_resource
from dagster_dbt import dbt_cli_resource

from dagster import (
repository,
with_resources,
)

# from . import assets
from .db_io_manager import db_io_manager
from .utils.constants import DBT_CONFIG, POSTGRES_CONFIG
from .assets.stargazer import airbyte_assets, dbt_assets # , metabase_dashboard


@repository
def assets_modern_data_stack():
return [
airbyte_assets,
with_resources(
dbt_assets, # load_assets_from_package_module(assets),
resource_defs={
"dbt": dbt_cli_resource.configured(DBT_CONFIG),
"db_io_manager": db_io_manager.configured(POSTGRES_CONFIG),
},
),
# metabase_dashboard,
]
28 changes: 28 additions & 0 deletions dagster/stargazer/assets_modern_data_stack/utils/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from dagster_postgres.utils import get_conn_string
from dagster._utils import file_relative_path


# AIRBYTE_CONFIG = {"host": "localhost", "port": "8000"}
DBT_PROJECT_DIR = file_relative_path(__file__, "../../transformation_dbt")
DBT_PROFILES_DIR = file_relative_path(__file__, "../../transformation_dbt/config")


DBT_CONFIG = {"project_dir": DBT_PROJECT_DIR, "profiles_dir": DBT_PROFILES_DIR}

PG_DESTINATION_CONFIG = {
"username": "postgres",
"password": "password",
"host": "localhost",
"port": 5432,
"database": "postgres",
}

POSTGRES_CONFIG = {
"con_string": get_conn_string(
username=PG_DESTINATION_CONFIG["username"],
password=PG_DESTINATION_CONFIG["password"],
hostname=PG_DESTINATION_CONFIG["host"],
port=str(PG_DESTINATION_CONFIG["port"]),
db_name=PG_DESTINATION_CONFIG["database"],
)
}
Loading

0 comments on commit 8ab3da4

Please sign in to comment.