diff --git a/src/integrations/prefect-dbt/tests/cli/test_commands.py b/src/integrations/prefect-dbt/tests/cli/test_commands.py index 4a96e1c3d003..96add6efa380 100644 --- a/src/integrations/prefect-dbt/tests/cli/test_commands.py +++ b/src/integrations/prefect-dbt/tests/cli/test_commands.py @@ -1,19 +1,28 @@ +import datetime import os from pathlib import Path from unittest.mock import MagicMock import pytest import yaml +from dbt.artifacts.resources.v1.components import FreshnessThreshold, Time from dbt.cli.main import DbtUsageException, dbtRunnerResult from dbt.contracts.files import FileHash -from dbt.contracts.graph.nodes import ModelNode -from dbt.contracts.results import RunExecutionResult, RunResult +from dbt.contracts.graph.nodes import ModelNode, SourceDefinition +from dbt.contracts.results import ( + FreshnessMetadata, + FreshnessResult, + RunExecutionResult, + RunResult, + SourceFreshnessResult, +) from prefect_dbt.cli.commands import ( DbtCoreOperation, run_dbt_build, run_dbt_model, run_dbt_seed, run_dbt_snapshot, + run_dbt_source_freshness, run_dbt_test, trigger_dbt_cli_command, ) @@ -95,47 +104,118 @@ async def mock_dbt_runner_model_error(): @pytest.fixture -async def mock_dbt_runner_ls_success(): +async def mock_dbt_runner_freshness_success(): return dbtRunnerResult( - success=True, exception=None, result=["example.example.test_model"] + success=True, + exception=None, + result=FreshnessResult( + results=[ + SourceFreshnessResult( + status="pass", + thread_id="Thread-1 (worker)", + execution_time=0.0, + adapter_response={}, + message=None, + failures=None, + max_loaded_at=datetime.datetime.now(), + snapshotted_at=datetime.datetime.now(), + timing=[], + node=SourceDefinition( + database="test-123", + schema="prefect_dbt_example", + name="my_first_dbt_model", + resource_type="source", + package_name="prefect_dbt_bigquery", + path="example/my_first_dbt_model.yml", + original_file_path="models/example/my_first_dbt_model.yml", + unique_id="source.prefect_dbt_bigquery.my_first_dbt_model", + fqn=["prefect_dbt_bigquery", "example", "my_first_dbt_model"], + source_name="prefect_dbt_source", + source_description="", + description="", + loader="my_loader", + identifier="my_identifier", + freshness=FreshnessThreshold( + warn_after=Time(count=12, period="hour"), + error_after=Time(count=24, period="hour"), + filter=None, + ), + ), + age=0.0, + ) + ], + elapsed_time=0.0, + metadata=FreshnessMetadata( + dbt_schema_version="https://schemas.getdbt.com/dbt/sources/v3.json", + dbt_version="1.1.1", + generated_at=datetime.datetime.now(), + invocation_id="invocation_id", + env={}, + ), + ), ) @pytest.fixture -def mock_dbt_runner_source_freshness_success(): +async def mock_dbt_runner_freshness_error(): return dbtRunnerResult( - success=True, + success=False, exception=None, - result=RunExecutionResult( + result=FreshnessResult( results=[ - RunResult( - status="pass", - timing=None, - thread_id="'Thread-1 (worker)'", - message="Runtime Error", + SourceFreshnessResult( + status="error", + thread_id="Thread-1 (worker)", + execution_time=0.0, + adapter_response={}, + message=None, failures=None, - node=ModelNode( + max_loaded_at=datetime.datetime.now(), + snapshotted_at=datetime.datetime.now(), + timing=[], + node=SourceDefinition( database="test-123", schema="prefect_dbt_example", name="my_first_dbt_model", - resource_type="model", + resource_type="source", package_name="prefect_dbt_bigquery", - path="example/my_first_dbt_model.sql", - original_file_path="models/example/my_first_dbt_model.sql", - unique_id="model.prefect_dbt_bigquery.my_first_dbt_model", + path="example/my_first_dbt_model.yml", + original_file_path="models/example/my_first_dbt_model.yml", + unique_id="source.prefect_dbt_bigquery.my_first_dbt_model", fqn=["prefect_dbt_bigquery", "example", "my_first_dbt_model"], - alias="my_first_dbt_model", - checksum=FileHash(name="sha256", checksum="123456789"), + source_name="my_first_dbt_source", + source_description="", + description="", + loader="my_loader", + identifier="my_identifier", + freshness=FreshnessThreshold( + warn_after=Time(count=12, period="hour"), + error_after=Time(count=24, period="hour"), + filter=None, + ), ), - execution_time=0.0, - adapter_response=None, + age=0.0, ) ], elapsed_time=0.0, + metadata=FreshnessMetadata( + dbt_schema_version="https://schemas.getdbt.com/dbt/sources/v3.json", + dbt_version="1.1.1", + generated_at=datetime.datetime.now(), + invocation_id="invocation_id", + env={}, + ), ), ) +@pytest.fixture +async def mock_dbt_runner_ls_success(): + return dbtRunnerResult( + success=True, exception=None, result=["example.example.test_model"] + ) + + @pytest.fixture def dbt_runner_model_result(monkeypatch, mock_dbt_runner_model_success): _mock_dbt_runner_invoke_success = MagicMock( @@ -153,16 +233,24 @@ def dbt_runner_ls_result(monkeypatch, mock_dbt_runner_ls_success): @pytest.fixture -def dbt_runner_source_freshness_result( - monkeypatch, mock_dbt_runner_source_freshness_success -): - _mock_dbt_runner_source_freshness_result = MagicMock( - return_value=mock_dbt_runner_source_freshness_success +def dbt_runner_freshness_error(monkeypatch, mock_dbt_runner_freshness_error): + _mock_dbt_runner_freshness_error = MagicMock( + return_value=mock_dbt_runner_freshness_error + ) + monkeypatch.setattr( + "dbt.cli.main.dbtRunner.invoke", _mock_dbt_runner_freshness_error + ) + + +@pytest.fixture +def dbt_runner_freshness_success(monkeypatch, mock_dbt_runner_freshness_success): + _mock_dbt_runner_freshness_success = MagicMock( + return_value=mock_dbt_runner_freshness_success ) monkeypatch.setattr( - "dbt.cli.main.dbtRunner.invoke", _mock_dbt_runner_source_freshness_result + "dbt.cli.main.dbtRunner.invoke", _mock_dbt_runner_freshness_success ) - return _mock_dbt_runner_source_freshness_result + return _mock_dbt_runner_freshness_success @pytest.fixture @@ -196,6 +284,39 @@ def test_flow(): assert isinstance(result, dbtRunnerResult) +def test_trigger_dbt_cli_command_cli_argument_list( + profiles_dir, dbt_cli_profile_bare, dbt_runner_freshness_success +): + @flow + def test_flow(): + return trigger_dbt_cli_command( + command="dbt source freshness", + profiles_dir=profiles_dir, + dbt_cli_profile=dbt_cli_profile_bare, + ) + + test_flow() + dbt_runner_freshness_success.assert_called_with( + ["source", "freshness", "--profiles-dir", profiles_dir] + ) + + +@pytest.mark.usefixtures("dbt_runner_freshness_error") +def test_trigger_dbt_cli_command_failed(profiles_dir, dbt_cli_profile_bare): + @flow + def test_flow(): + return trigger_dbt_cli_command( + command="dbt source freshness", + profiles_dir=profiles_dir, + dbt_cli_profile=dbt_cli_profile_bare, + ) + + with pytest.raises( + Exception, match="dbt task result success: False with exception: None" + ): + test_flow() + + @pytest.mark.usefixtures("dbt_runner_ls_result") def test_trigger_dbt_cli_command_run_twice_overwrite( profiles_dir, dbt_cli_profile, dbt_cli_profile_bare @@ -264,7 +385,7 @@ def test_flow(): ) with pytest.raises( - ValueError, match="Provide `dbt_cli_profile` keyword for writing profiles" + ValueError, match="Profile not found. Provide `dbt_cli_profile` or" ): test_flow() @@ -328,24 +449,6 @@ def test_flow(): assert isinstance(result, dbtRunnerResult) -def test_trigger_dbt_cli_multi_word_command( - profiles_dir, dbt_cli_profile_bare, dbt_runner_source_freshness_result -): - @flow - def test_flow(): - return trigger_dbt_cli_command( - "dbt source freshness", - profiles_dir=profiles_dir, - dbt_cli_profile=dbt_cli_profile_bare, - ) - - result = test_flow() - assert isinstance(result, dbtRunnerResult) - dbt_runner_source_freshness_result.assert_called_once_with( - ["source", "freshness", "--profiles-dir", profiles_dir] - ) - - class TestDbtCoreOperation: @pytest.fixture def mock_open_process(self, monkeypatch): @@ -426,10 +529,6 @@ def test_find_valid_profiles_dir_path_without_profile(self): with pytest.raises(ValueError, match="Since overwrite_profiles is True"): DbtCoreOperation(commands=["dbt debug"], profiles_dir=Path("fake")).run() - def test_process_commands_not_dbt(self): - with pytest.raises(ValueError, match="None of the commands"): - assert DbtCoreOperation(commands=["ls"]) - def test_append_dirs_to_commands( self, tmp_path, @@ -459,18 +558,18 @@ def test_append_dirs_to_commands( @pytest.mark.usefixtures("dbt_runner_model_result") -def test_run_dbt_build_creates_artifact(profiles_dir, dbt_cli_profile_bare): +async def test_run_dbt_build_creates_artifact(profiles_dir, dbt_cli_profile_bare): @flow - def test_flow(): - return run_dbt_build( + async def test_flow(): + return await run_dbt_build( profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, summary_artifact_key="foo", create_summary_artifact=True, ) - test_flow() - assert (a := Artifact.get(key="foo")) + await test_flow() + assert (a := await Artifact.get(key="foo")) assert a.type == "markdown" assert a.data.startswith("# dbt build Task Summary") assert "my_first_dbt_model" in a.data @@ -478,18 +577,18 @@ def test_flow(): @pytest.mark.usefixtures("dbt_runner_model_result") -def test_run_dbt_test_creates_artifact(profiles_dir, dbt_cli_profile_bare): +async def test_run_dbt_test_creates_artifact(profiles_dir, dbt_cli_profile_bare): @flow - def test_flow(): - return run_dbt_test( + async def test_flow(): + return await run_dbt_test( profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, summary_artifact_key="foo", create_summary_artifact=True, ) - test_flow() - assert (a := Artifact.get(key="foo")) + await test_flow() + assert (a := await Artifact.get(key="foo")) assert a.type == "markdown" assert a.data.startswith("# dbt test Task Summary") assert "my_first_dbt_model" in a.data @@ -497,18 +596,18 @@ def test_flow(): @pytest.mark.usefixtures("dbt_runner_model_result") -def test_run_dbt_snapshot_creates_artifact(profiles_dir, dbt_cli_profile_bare): +async def test_run_dbt_snapshot_creates_artifact(profiles_dir, dbt_cli_profile_bare): @flow - def test_flow(): - return run_dbt_snapshot( + async def test_flow(): + return await run_dbt_snapshot( profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, summary_artifact_key="foo", create_summary_artifact=True, ) - test_flow() - assert (a := Artifact.get(key="foo")) + await test_flow() + assert (a := await Artifact.get(key="foo")) assert a.type == "markdown" assert a.data.startswith("# dbt snapshot Task Summary") assert "my_first_dbt_model" in a.data @@ -516,18 +615,18 @@ def test_flow(): @pytest.mark.usefixtures("dbt_runner_model_result") -def test_run_dbt_seed_creates_artifact(profiles_dir, dbt_cli_profile_bare): +async def test_run_dbt_seed_creates_artifact(profiles_dir, dbt_cli_profile_bare): @flow - def test_flow(): - return run_dbt_seed( + async def test_flow(): + return await run_dbt_seed( profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, summary_artifact_key="foo", create_summary_artifact=True, ) - test_flow() - assert (a := Artifact.get(key="foo")) + await test_flow() + assert (a := await Artifact.get(key="foo")) assert a.type == "markdown" assert a.data.startswith("# dbt seed Task Summary") assert "my_first_dbt_model" in a.data @@ -535,24 +634,45 @@ def test_flow(): @pytest.mark.usefixtures("dbt_runner_model_result") -def test_run_dbt_model_creates_artifact(profiles_dir, dbt_cli_profile_bare): +async def test_run_dbt_model_creates_artifact(profiles_dir, dbt_cli_profile_bare): @flow - def test_flow(): - return run_dbt_model( + async def test_flow(): + return await run_dbt_model( profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, summary_artifact_key="foo", create_summary_artifact=True, ) - test_flow() - assert (a := Artifact.get(key="foo")) + await test_flow() + assert (a := await Artifact.get(key="foo")) assert a.type == "markdown" assert a.data.startswith("# dbt run Task Summary") assert "my_first_dbt_model" in a.data assert "Successful Nodes" in a.data +@pytest.mark.usefixtures("dbt_runner_freshness_success") +async def test_run_dbt_source_freshness_creates_artifact( + profiles_dir, dbt_cli_profile_bare +): + @flow + async def test_flow(): + return await run_dbt_source_freshness( + profiles_dir=profiles_dir, + dbt_cli_profile=dbt_cli_profile_bare, + summary_artifact_key="foo", + create_summary_artifact=True, + ) + + await test_flow() + assert (a := await Artifact.get(key="foo")) + assert a.type == "markdown" + assert a.data.startswith("# dbt source freshness Task Summary") + assert "my_first_dbt_model" in a.data + assert "Successful Nodes" in a.data + + @pytest.fixture def dbt_runner_model_error(monkeypatch, mock_dbt_runner_model_error): _mock_dbt_runner_invoke_error = MagicMock(return_value=mock_dbt_runner_model_error) @@ -560,12 +680,12 @@ def dbt_runner_model_error(monkeypatch, mock_dbt_runner_model_error): @pytest.mark.usefixtures("dbt_runner_model_error") -def test_run_dbt_model_creates_unsuccessful_artifact( +async def test_run_dbt_model_creates_unsuccessful_artifact( profiles_dir, dbt_cli_profile_bare ): @flow - def test_flow(): - return run_dbt_model( + async def test_flow(): + return await run_dbt_model( profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, summary_artifact_key="foo", @@ -573,21 +693,45 @@ def test_flow(): ) with pytest.raises( - Exception, match="dbt task result unsuccessful with exception: None" + Exception, match="dbt task result success: False with exception: None" ): - test_flow() - assert (a := Artifact.get(key="foo")) + await test_flow() + assert (a := await Artifact.get(key="foo")) assert a.type == "markdown" assert a.data.startswith("# dbt run Task Summary") assert "my_first_dbt_model" in a.data assert "Unsuccessful Nodes" in a.data +@pytest.mark.usefixtures("dbt_runner_freshness_error") +async def test_run_dbt_source_freshness_creates_unsuccessful_artifact( + profiles_dir, dbt_cli_profile_bare +): + @flow + async def test_flow(): + return await run_dbt_source_freshness( + profiles_dir=profiles_dir, + dbt_cli_profile=dbt_cli_profile_bare, + summary_artifact_key="foo", + create_summary_artifact=True, + ) + + with pytest.raises( + Exception, match="dbt task result success: False with exception: None" + ): + await test_flow() + assert (a := await Artifact.get(key="foo")) + assert a.type == "markdown" + assert a.data.startswith("# dbt source freshness Task Summary") + assert "my_first_dbt_model" in a.data + assert "Unsuccessful Nodes" in a.data + + @pytest.mark.usefixtures("dbt_runner_failed_result") -def test_run_dbt_model_throws_error(profiles_dir, dbt_cli_profile_bare): +async def test_run_dbt_model_throws_error(profiles_dir, dbt_cli_profile_bare): @flow - def test_flow(): - return run_dbt_model( + async def test_flow(): + return await run_dbt_model( profiles_dir=profiles_dir, dbt_cli_profile=dbt_cli_profile_bare, summary_artifact_key="foo", @@ -595,4 +739,4 @@ def test_flow(): ) with pytest.raises(DbtUsageException, match="No such command 'weeeeeee'."): - test_flow() + await test_flow()