Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add a task to dump dbt test results to s3 #1116

Merged
merged 6 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "viadot2"
version = "2.1.28"
version = "2.1.29"
description = "A simple data ingestion library to guide data flows from some places to other places."
authors = [
{ name = "acivitillo", email = "[email protected]" },
Expand Down
39 changes: 33 additions & 6 deletions src/viadot/orchestration/prefect/flows/transform_and_catalog.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
"""Build specified dbt model(s) and upload the generated metadata to Luma."""

from datetime import datetime, timezone
from pathlib import Path
import shutil
from typing import Literal

from prefect import allow_failure, flow, task
from prefect import flow, task

from viadot.orchestration.prefect.tasks import clone_repo, dbt_task, luma_ingest_task
from viadot.orchestration.prefect.tasks import (
clone_repo,
dbt_task,
luma_ingest_task,
s3_upload_file,
)
from viadot.orchestration.prefect.utils import get_credentials


Expand Down Expand Up @@ -37,6 +43,7 @@ def transform_and_catalog( # noqa: PLR0913
luma_url: str = "http://localhost:8000",
luma_follow: bool = False,
metadata_kind: Literal["model", "model_run"] = "model_run",
run_results_s3_path: str | None = None,
) -> list[str]:
"""Build specified dbt model(s) and upload the generated metadata to Luma.

Expand Down Expand Up @@ -78,6 +85,9 @@ def transform_and_catalog( # noqa: PLR0913
response). By default, `False`.
metadata_kind (Literal["model", "model_run"], optional): The kind of metadata
to ingest. Defaults to "model_run".
run_results_s3_path (str, optional): The S3 directory to upload the
`run_results.json` file to. Note that a timestamp will be appended to the
end of the file. Defaults to None.

Returns:
list[str]: Lines from stdout of the `upload_metadata` task as a list.
Expand All @@ -96,6 +106,7 @@ def transform_and_catalog( # noqa: PLR0913
dbt_repo_url=my_dbt_repo_url
dbt_selects={"run": "staging"}
luma_url=my_luma_url,
run_results_s3_path="s3://my-bucket/dbt/run_results"
)
```

Expand Down Expand Up @@ -174,10 +185,26 @@ def transform_and_catalog( # noqa: PLR0913
wait_for=[upload_metadata_upstream_task],
)

if run_results_s3_path:
file_name = "run_results.json"
# Add a timestamp suffix.
timestamp = datetime.now(timezone.utc).timestamp()
run_results_s3_path = run_results_s3_path.rstrip("/") + "/"
run_results_s3_path += file_name + "_" + str(timestamp)

# Upload the file to s3.
dump_test_results_to_s3 = s3_upload_file(
from_path=str(dbt_target_dir_path / file_name),
to_path=run_results_s3_path,
wait_for=[upload_metadata_upstream_task],
)

# Cleanup.
remove_dbt_repo_dir(
dbt_repo_name,
wait_for=[allow_failure(upload_metadata)],
wait_for = (
[upload_metadata, dump_test_results_to_s3]
if run_results_s3_path
else [upload_metadata]
)
remove_dbt_repo_dir(dbt_repo_name, wait_for=wait_for)

return upload_metadata
return remove_dbt_repo_dir
Loading