Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ruff ify #139

Merged
merged 6 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .github/workflows/deploy-dagster-cloud.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ jobs:
if: steps.prerun.outputs.result != 'skip'
with:
ref: ${{ github.head_ref }}

- name: Get changed files
id: changed-files
uses: tj-actions/changed-files@v45
Expand Down Expand Up @@ -107,7 +108,12 @@ jobs:
run: |
uv venv
source .venv/bin/activate
uv pip install dagster-dbt dagster-cloud dbt-core dbt-duckdb dbt-snowflake --upgrade;
uv pip install dagster-dbt dagster-cloud dbt-core dbt-duckdb "dbt-snowflake<=1.8.4" --upgrade;

- name: Run Ruff
uses: astral-sh/ruff-action@v3
with:
args: check --fix --output-format=github .

- name: Validate configuration
id: ci-validate
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ update_packages:
uv lock --upgrade --directory hooli-data-ingest;
uv lock --upgrade --directory hooli-bi;

ruff:
-ruff check --fix .
ruff format .

# ensure that DAGSTER_GIT_REPO_DIR is set to the path of the dagster repo
# see https://www.notion.so/dagster/Local-Dev-Setup-e58aba352f704dcc88a8dc44cb1ce7fc for more details
Expand Down
2 changes: 1 addition & 1 deletion hooli-bi/hooli_bi/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from hooli_bi.definitions import defs as defs
from hooli_bi.definitions import defs as defs
53 changes: 31 additions & 22 deletions hooli-bi/hooli_bi/powerbi_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from dagster_powerbi.translator import PowerBIContentData
from hooli_bi.powerbi_workspace import power_bi_workspace


class MyCustomPowerBITranslator(DagsterPowerBITranslator):
def get_report_spec(self, data: PowerBIContentData) -> AssetSpec:
spec = super().get_report_spec(data)
Expand All @@ -16,44 +17,52 @@ def get_report_spec(self, data: PowerBIContentData) -> AssetSpec:
description=f"Report link: https://app.powerbi.com/groups/{EnvVar("AZURE_POWERBI_WORKSPACE_ID").get_value()}/reports/{data.properties["id"]}",
group_name="BI",
)
return merge_attributes(specs_replaced, tags={"core_kpis":""})
return merge_attributes(specs_replaced, tags={"core_kpis": ""})

def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec:
def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec:
spec = super().get_semantic_model_spec(data)
spec_replaced = replace_attributes(
spec,
description=f"Semantic model link: https://app.powerbi.com/groups/{EnvVar("AZURE_POWERBI_WORKSPACE_ID").get_value()}/datasets/{data.properties["id"]}/details",
group_name="BI",
deps=[AssetKey(path=[dep.asset_key.path[1].upper(), dep.asset_key.path[2]]) for dep in spec.deps],
deps=[
AssetKey(path=[dep.asset_key.path[1].upper(), dep.asset_key.path[2]])
for dep in spec.deps
],
)
return merge_attributes(spec_replaced,
metadata={"dagster/column_schema": TableSchema(
columns=[TableColumn(
name=col["name"],
type=col["dataType"],
tags={"PII":""} if col["name"] == "USER_ID" else None)
for col in data.properties["tables"][0]["columns"]])},
tags={"core_kpis":""},
return merge_attributes(
spec_replaced,
metadata={
"dagster/column_schema": TableSchema(
columns=[
TableColumn(
name=col["name"],
type=col["dataType"],
tags={"PII": ""} if col["name"] == "USER_ID" else None,
)
for col in data.properties["tables"][0]["columns"]
]
)
},
tags={"core_kpis": ""},
)

def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec:
spec = super().get_dashboard_spec(data)
return replace_attributes(
spec,
group_name="BI"
)

return replace_attributes(spec, group_name="BI")

def get_data_source_spec(self, data: PowerBIContentData) -> AssetSpec:
spec = super().get_data_source_spec(data)
return replace_attributes(
spec,
group_name="BI"
)
return replace_attributes(spec, group_name="BI")


powerbi_assets = [
build_semantic_model_refresh_asset_definition(resource_key="power_bi", spec=spec)
if spec.tags.get("dagster-powerbi/asset_type") == "semantic_model"
else spec
for spec in load_powerbi_asset_specs(
power_bi_workspace, dagster_powerbi_translator=MyCustomPowerBITranslator, use_workspace_scan=True)
]
power_bi_workspace,
dagster_powerbi_translator=MyCustomPowerBITranslator,
use_workspace_scan=True,
)
]
2 changes: 1 addition & 1 deletion hooli-bi/hooli_bi/powerbi_workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
tenant_id=EnvVar("AZURE_POWERBI_TENANT_ID"),
),
workspace_id=EnvVar("AZURE_POWERBI_WORKSPACE_ID"),
)
)
2 changes: 1 addition & 1 deletion hooli-data-ingest/hooli_data_ingest/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from hooli_data_ingest.definitions import defs as defs
from hooli_data_ingest.definitions import defs as defs
35 changes: 18 additions & 17 deletions hooli-data-ingest/hooli_data_ingest/assets/sling.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,34 @@
from dagster_embedded_elt.sling import (
sling_assets,
SlingResource,
sling_assets,
SlingResource,
)
from dagster_embedded_elt.sling.dagster_sling_translator import DagsterSlingTranslator

from hooli_data_ingest.resources import replication_config


class CustomSlingTranslator(DagsterSlingTranslator):
def __init__(self, target_prefix="RAW_DATA"):
def __init__(self, target_prefix="RAW_DATA"):
super().__init__(target_prefix=target_prefix)
self.replication_config = replication_config
def get_group_name(self, stream_definition):
return "RAW_DATA"
def get_kinds(self, stream_definition):
storage_kind = self.replication_config.get("target", "DUCKDB")
if storage_kind.startswith("SNOWFLAKE"):

def get_group_name(self, stream_definition):
return "RAW_DATA"

def get_kinds(self, stream_definition):
storage_kind = self.replication_config.get("target", "DUCKDB")
if storage_kind.startswith("SNOWFLAKE"):
storage_kind = "SNOWFLAKE"
return {"sling",storage_kind}

return {"sling", storage_kind}


@sling_assets(
replication_config=replication_config,
dagster_sling_translator=CustomSlingTranslator(),
replication_config=replication_config,
dagster_sling_translator=CustomSlingTranslator(),
)
def my_sling_assets(context, sling: SlingResource):
yield from sling.replicate(context=context).fetch_column_metadata().fetch_row_count()
for row in sling.stream_raw_logs():
context.log.info(row)
yield from (
sling.replicate(context=context).fetch_column_metadata().fetch_row_count()
)
for row in sling.stream_raw_logs():
context.log.info(row)
23 changes: 10 additions & 13 deletions hooli-data-ingest/hooli_data_ingest/definitions.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from pathlib import Path

from dagster import (
AnchorBasedFilePathMapping,
Definitions,
with_source_code_references,
AnchorBasedFilePathMapping,
Definitions,
with_source_code_references,
)
from dagster._core.definitions.metadata import with_source_code_references
from dagster_cloud.metadata.source_code import link_code_references_to_git_if_cloud

from hooli_data_ingest.assets.sling import my_sling_assets
Expand All @@ -15,16 +14,14 @@


defs = Definitions(
assets=link_code_references_to_git_if_cloud(
with_source_code_references([my_sling_assets]),
file_path_mapping=AnchorBasedFilePathMapping(
assets=link_code_references_to_git_if_cloud(
with_source_code_references([my_sling_assets]),
file_path_mapping=AnchorBasedFilePathMapping(
local_file_anchor=Path(__file__),
file_anchor_path_in_repository="hooli-data-ingest/hooli_data_ingest/definitions.py",
),
),
schedules=[daily_sling_assets],
jobs=[daily_sling_job],
resources={
"sling": sling_resource
},
),
schedules=[daily_sling_assets],
jobs=[daily_sling_job],
resources={"sling": sling_resource},
)
4 changes: 2 additions & 2 deletions hooli-data-ingest/hooli_data_ingest/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@


daily_sling_job = define_asset_job(
name="daily_sling_job",
selection=raw_location_by_day,
name="daily_sling_job",
selection=raw_location_by_day,
)
96 changes: 44 additions & 52 deletions hooli-data-ingest/hooli_data_ingest/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@

from dagster import EnvVar
from dagster_embedded_elt.sling import (
SlingResource,
SlingConnectionResource,
SlingResource,
SlingConnectionResource,
)


def get_env():
if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1":
return "BRANCH"
if os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "data-eng-prod":
return "PROD"
return "LOCAL"
if os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", "") == "1":
return "BRANCH"
if os.getenv("DAGSTER_CLOUD_DEPLOYMENT_NAME", "") == "data-eng-prod":
return "PROD"
return "LOCAL"


# Paths for local dev
current_file_path = Path(__file__)
Expand All @@ -23,25 +25,19 @@ def get_env():


if get_env() == "LOCAL":
replication_config = {
"source": "LOCAL",
"target": "DUCKDB",
"defaults": {
"mode": "full-refresh",
"object": "{stream_file_folder}.{stream_file_name}",
"source_options": {
"format": "csv"
}
},
"streams": {
LOCATIONS_CSV_PATH: {
"object": "locations"
}
}
}
replication_config = {
"source": "LOCAL",
"target": "DUCKDB",
"defaults": {
"mode": "full-refresh",
"object": "{stream_file_folder}.{stream_file_name}",
"source_options": {"format": "csv"},
},
"streams": {LOCATIONS_CSV_PATH: {"object": "locations"}},
}

sling_resource = SlingResource(
connections=[
sling_resource = SlingResource(
connections=[
SlingConnectionResource(
name="Local",
type="local",
Expand All @@ -55,7 +51,7 @@ def get_env():
schema="raw_data",
),
]
)
)

if get_env() != "LOCAL":
replication_config = {
Expand All @@ -64,36 +60,32 @@ def get_env():
"defaults": {
"mode": "full-refresh",
"object": "{stream_file_folder}.{stream_file_name}",
"source_options": {
"format": "csv"
}
"source_options": {"format": "csv"},
},
"streams": {
"s3://hooli-demo/embedded-elt/locations.csv": {
"object": "locations"
}
}
"s3://hooli-demo/embedded-elt/locations.csv": {"object": "locations"}
},
}

sling_resource = SlingResource(
connections=[
SlingConnectionResource(
name="S3",
type="s3",
bucket=EnvVar("AWS_S3_BUCKET"),
region=EnvVar("AWS_REGION"),
access_key_id=EnvVar("AWS_ACCESS_KEY_ID"),
secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"),
),
SlingConnectionResource(
name="SNOWFLAKE_PROD" if get_env() == "PROD" else "SNOWFLAKE_BRANCH",
type="snowflake",
host=EnvVar("SNOWFLAKE_HOST"),
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
role=EnvVar("SNOWFLAKE_ROLE"),
database="DEMO_DB2" if get_env() == "PROD" else "DEMO_DB2_BRANCH",
schema="RAW_DATA",
)
SlingConnectionResource(
name="S3",
type="s3",
bucket=EnvVar("AWS_S3_BUCKET"),
region=EnvVar("AWS_REGION"),
access_key_id=EnvVar("AWS_ACCESS_KEY_ID"),
secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"),
),
SlingConnectionResource(
name="SNOWFLAKE_PROD" if get_env() == "PROD" else "SNOWFLAKE_BRANCH",
type="snowflake",
host=EnvVar("SNOWFLAKE_HOST"),
user=EnvVar("SNOWFLAKE_USER"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
role=EnvVar("SNOWFLAKE_ROLE"),
database="DEMO_DB2" if get_env() == "PROD" else "DEMO_DB2_BRANCH",
schema="RAW_DATA",
),
]
)
)
4 changes: 2 additions & 2 deletions hooli-data-ingest/hooli_data_ingest/schedules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@


daily_sling_assets = ScheduleDefinition(
job=daily_sling_job,
cron_schedule="0 0 * * *", # every day at midnight
job=daily_sling_job,
cron_schedule="0 0 * * *", # every day at midnight
)
Loading
Loading