Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add a task to dump dbt test results to s3 #1116

Merged
merged 6 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,6 @@
{
"path": "detect_secrets.filters.allowlist.is_line_allowlisted"
},
{
"path": "detect_secrets.filters.common.is_baseline_file",
"filename": ".secrets.baseline"
},
{
"path": "detect_secrets.filters.common.is_ignored_due_to_verification_policies",
"min_level": 2
Expand Down Expand Up @@ -212,13 +208,22 @@
"line_number": 38
}
],
"src/viadot/orchestration/prefect/flows/transform_and_catalog.py": [
{
"type": "Secret Keyword",
"filename": "src/viadot/orchestration/prefect/flows/transform_and_catalog.py",
"hashed_secret": "d0e35bfe1137a35c68a12d8ad06ee2636ef75e21",
"is_verified": false,
"line_number": 116
}
],
"src/viadot/orchestration/prefect/tasks/s3.py": [
{
"type": "Secret Keyword",
"filename": "src/viadot/orchestration/prefect/tasks/s3.py",
"hashed_secret": "ad3a464fefa681e5db5d5002646c1cd25df15c06",
"is_verified": false,
"line_number": 45
"line_number": 52
}
],
"tests/integration/orchestration/prefect/flows/test_duckdb_to_sql_server.py": [
Expand Down Expand Up @@ -397,5 +402,5 @@
}
]
},
"generated_at": "2024-10-03T09:38:32Z"
"generated_at": "2025-01-17T12:04:01Z"
}
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "viadot2"
version = "2.1.28"
version = "2.1.29"
description = "A simple data ingestion library to guide data flows from some places to other places."
authors = [
{ name = "acivitillo", email = "[email protected]" },
Expand Down
57 changes: 51 additions & 6 deletions src/viadot/orchestration/prefect/flows/transform_and_catalog.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
"""Build specified dbt model(s) and upload the generated metadata to Luma."""

from datetime import datetime, timezone
from pathlib import Path
import shutil
from typing import Literal

from prefect import allow_failure, flow, task
from prefect import flow, task

from viadot.orchestration.prefect.tasks import clone_repo, dbt_task, luma_ingest_task
from viadot.orchestration.prefect.tasks import (
clone_repo,
dbt_task,
luma_ingest_task,
s3_upload_file,
)
from viadot.orchestration.prefect.utils import get_credentials


Expand Down Expand Up @@ -37,6 +43,9 @@ def transform_and_catalog( # noqa: PLR0913
luma_url: str = "http://localhost:8000",
luma_follow: bool = False,
metadata_kind: Literal["model", "model_run"] = "model_run",
run_results_storage_path: str | None = None,
run_results_storage_config_key: str | None = None,
run_results_storage_credentials_secret: str | None = None,
) -> list[str]:
"""Build specified dbt model(s) and upload the generated metadata to Luma.

Expand Down Expand Up @@ -78,6 +87,13 @@ def transform_and_catalog( # noqa: PLR0913
response). By default, `False`.
metadata_kind (Literal["model", "model_run"], optional): The kind of metadata
to ingest. Defaults to "model_run".
run_results_storage_path (str, optional): The directory to upload the
`run_results.json` file to. Note that a timestamp will be appended to the
end of the file. Currently, only S3 is supported. Defaults to None.
run_results_storage_config_key (str, optional): The key in the viadot config
holding AWS credentials. Defaults to None.
run_results_storage_credentials_secret (str, optional): The name of the secret
block in Prefect holding AWS credentials. Defaults to None.

Returns:
list[str]: Lines from stdout of the `upload_metadata` task as a list.
Expand All @@ -96,6 +112,8 @@ def transform_and_catalog( # noqa: PLR0913
dbt_repo_url=my_dbt_repo_url
dbt_selects={"run": "staging"}
luma_url=my_luma_url,
run_results_storage_path="s3://my-bucket/dbt/run_results",
run_results_storage_credentials_secret="my-aws-credentials-block",
)
```

Expand Down Expand Up @@ -174,10 +192,37 @@ def transform_and_catalog( # noqa: PLR0913
wait_for=[upload_metadata_upstream_task],
)

if run_results_storage_path:
# Set the file path to include date info.
file_name = "run_results.json"
now = datetime.now(timezone.utc)
run_results_storage_path = run_results_storage_path.rstrip("/") + "/"

# Add partitioning.
date_str = now.strftime("%Y%m%d")
run_results_storage_path += date_str + "/"

# Add timestamp suffix, eg. run_results_1737556947.934292.json.
timestamp = now.timestamp()
run_results_storage_path += (
Path(file_name).stem + "_" + str(timestamp) + ".json"
)

# Upload the file to s3.
dump_test_results_to_s3 = s3_upload_file(
from_path=str(dbt_target_dir_path / file_name),
to_path=run_results_storage_path,
wait_for=[upload_metadata_upstream_task],
config_key=run_results_storage_config_key,
credentials_secret=run_results_storage_credentials_secret,
)

# Cleanup.
remove_dbt_repo_dir(
dbt_repo_name,
wait_for=[allow_failure(upload_metadata)],
wait_for = (
[upload_metadata, dump_test_results_to_s3]
if run_results_storage_path
else [upload_metadata]
)
remove_dbt_repo_dir(dbt_repo_name, wait_for=wait_for)

return upload_metadata
return remove_dbt_repo_dir
14 changes: 14 additions & 0 deletions src/viadot/orchestration/prefect/tasks/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
from prefect import task
from prefect.logging import get_run_logger

from viadot.config import get_source_credentials
from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError
from viadot.orchestration.prefect.utils import get_credentials


with contextlib.suppress(ImportError):
from viadot.sources import S3
Expand All @@ -17,6 +21,7 @@ def s3_upload_file(
to_path: str,
credentials: dict[str, Any] | None = None,
config_key: str | None = None,
credentials_secret: str | None = None,
) -> None:
"""Task to upload a file to Amazon S3.

Expand All @@ -27,6 +32,8 @@ def s3_upload_file(
Defaults to None.
config_key (str, optional): The key in the viadot config holding relevant
credentials. Defaults to None.
credentials_secret (str, optional): The name of a secret block in Prefect
that stores AWS credentials. Defaults to None.

Example:
```python
Expand All @@ -49,6 +56,13 @@ def test_flow():
test_flow()
```
"""
if not (credentials_secret or config_key):
raise MissingSourceCredentialsError

credentials = get_source_credentials(config_key) or get_credentials(
credentials_secret
)

s3 = S3(credentials=credentials, config_key=config_key)

s3.upload(from_path=from_path, to_path=to_path)
Expand Down
Loading