diff --git a/.secrets.baseline b/.secrets.baseline index 16a987f3f..011473465 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -90,10 +90,6 @@ { "path": "detect_secrets.filters.allowlist.is_line_allowlisted" }, - { - "path": "detect_secrets.filters.common.is_baseline_file", - "filename": ".secrets.baseline" - }, { "path": "detect_secrets.filters.common.is_ignored_due_to_verification_policies", "min_level": 2 @@ -212,13 +208,22 @@ "line_number": 38 } ], + "src/viadot/orchestration/prefect/flows/transform_and_catalog.py": [ + { + "type": "Secret Keyword", + "filename": "src/viadot/orchestration/prefect/flows/transform_and_catalog.py", + "hashed_secret": "d0e35bfe1137a35c68a12d8ad06ee2636ef75e21", + "is_verified": false, + "line_number": 116 + } + ], "src/viadot/orchestration/prefect/tasks/s3.py": [ { "type": "Secret Keyword", "filename": "src/viadot/orchestration/prefect/tasks/s3.py", "hashed_secret": "ad3a464fefa681e5db5d5002646c1cd25df15c06", "is_verified": false, - "line_number": 45 + "line_number": 52 } ], "tests/integration/orchestration/prefect/flows/test_duckdb_to_sql_server.py": [ @@ -397,5 +402,5 @@ } ] }, - "generated_at": "2024-10-03T09:38:32Z" + "generated_at": "2025-01-17T12:04:01Z" } diff --git a/pyproject.toml b/pyproject.toml index 01e902270..9d519e944 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "viadot2" -version = "2.1.28" +version = "2.1.29" description = "A simple data ingestion library to guide data flows from some places to other places." authors = [ { name = "acivitillo", email = "acivitillo@dyvenia.com" }, diff --git a/src/viadot/orchestration/prefect/flows/transform_and_catalog.py b/src/viadot/orchestration/prefect/flows/transform_and_catalog.py index cd3c5b4b1..49d36b536 100644 --- a/src/viadot/orchestration/prefect/flows/transform_and_catalog.py +++ b/src/viadot/orchestration/prefect/flows/transform_and_catalog.py @@ -1,12 +1,18 @@ """Build specified dbt model(s) and upload the generated metadata to Luma.""" +from datetime import datetime, timezone from pathlib import Path import shutil from typing import Literal -from prefect import allow_failure, flow, task +from prefect import flow, task -from viadot.orchestration.prefect.tasks import clone_repo, dbt_task, luma_ingest_task +from viadot.orchestration.prefect.tasks import ( + clone_repo, + dbt_task, + luma_ingest_task, + s3_upload_file, +) from viadot.orchestration.prefect.utils import get_credentials @@ -37,6 +43,9 @@ def transform_and_catalog( # noqa: PLR0913 luma_url: str = "http://localhost:8000", luma_follow: bool = False, metadata_kind: Literal["model", "model_run"] = "model_run", + run_results_storage_path: str | None = None, + run_results_storage_config_key: str | None = None, + run_results_storage_credentials_secret: str | None = None, ) -> list[str]: """Build specified dbt model(s) and upload the generated metadata to Luma. @@ -78,6 +87,13 @@ def transform_and_catalog( # noqa: PLR0913 response). By default, `False`. metadata_kind (Literal["model", "model_run"], optional): The kind of metadata to ingest. Defaults to "model_run". + run_results_storage_path (str, optional): The directory to upload the + `run_results.json` file to. Note that a timestamp will be appended to the + end of the file. Currently, only S3 is supported. Defaults to None. + run_results_storage_config_key (str, optional): The key in the viadot config + holding AWS credentials. Defaults to None. + run_results_storage_credentials_secret (str, optional): The name of the secret + block in Prefect holding AWS credentials. Defaults to None. Returns: list[str]: Lines from stdout of the `upload_metadata` task as a list. @@ -96,6 +112,8 @@ def transform_and_catalog( # noqa: PLR0913 dbt_repo_url=my_dbt_repo_url dbt_selects={"run": "staging"} luma_url=my_luma_url, + run_results_storage_path="s3://my-bucket/dbt/run_results", + run_results_storage_credentials_secret="my-aws-credentials-block", ) ``` @@ -174,10 +192,37 @@ def transform_and_catalog( # noqa: PLR0913 wait_for=[upload_metadata_upstream_task], ) + if run_results_storage_path: + # Set the file path to include date info. + file_name = "run_results.json" + now = datetime.now(timezone.utc) + run_results_storage_path = run_results_storage_path.rstrip("/") + "/" + + # Add partitioning. + date_str = now.strftime("%Y%m%d") + run_results_storage_path += date_str + "/" + + # Add timestamp suffix, eg. run_results_1737556947.934292.json. + timestamp = now.timestamp() + run_results_storage_path += ( + Path(file_name).stem + "_" + str(timestamp) + ".json" + ) + + # Upload the file to s3. + dump_test_results_to_s3 = s3_upload_file( + from_path=str(dbt_target_dir_path / file_name), + to_path=run_results_storage_path, + wait_for=[upload_metadata_upstream_task], + config_key=run_results_storage_config_key, + credentials_secret=run_results_storage_credentials_secret, + ) + # Cleanup. - remove_dbt_repo_dir( - dbt_repo_name, - wait_for=[allow_failure(upload_metadata)], + wait_for = ( + [upload_metadata, dump_test_results_to_s3] + if run_results_storage_path + else [upload_metadata] ) + remove_dbt_repo_dir(dbt_repo_name, wait_for=wait_for) - return upload_metadata + return remove_dbt_repo_dir diff --git a/src/viadot/orchestration/prefect/tasks/s3.py b/src/viadot/orchestration/prefect/tasks/s3.py index 9816ab595..74fcaedde 100644 --- a/src/viadot/orchestration/prefect/tasks/s3.py +++ b/src/viadot/orchestration/prefect/tasks/s3.py @@ -6,6 +6,10 @@ from prefect import task from prefect.logging import get_run_logger +from viadot.config import get_source_credentials +from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError +from viadot.orchestration.prefect.utils import get_credentials + with contextlib.suppress(ImportError): from viadot.sources import S3 @@ -17,6 +21,7 @@ def s3_upload_file( to_path: str, credentials: dict[str, Any] | None = None, config_key: str | None = None, + credentials_secret: str | None = None, ) -> None: """Task to upload a file to Amazon S3. @@ -27,6 +32,8 @@ def s3_upload_file( Defaults to None. config_key (str, optional): The key in the viadot config holding relevant credentials. Defaults to None. + credentials_secret (str, optional): The name of a secret block in Prefect + that stores AWS credentials. Defaults to None. Example: ```python @@ -49,6 +56,13 @@ def test_flow(): test_flow() ``` """ + if not (credentials_secret or config_key): + raise MissingSourceCredentialsError + + credentials = get_source_credentials(config_key) or get_credentials( + credentials_secret + ) + s3 = S3(credentials=credentials, config_key=config_key) s3.upload(from_path=from_path, to_path=to_path)