Skip to content

Commit

Permalink
Merge branch 'main' into fix_9534
Browse files Browse the repository at this point in the history
  • Loading branch information
aranke authored Apr 15, 2024
2 parents 1272e1e + 7e72cac commit 02a545c
Show file tree
Hide file tree
Showing 18 changed files with 1,065 additions and 586 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240404-170728.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: 'source freshness precomputes metadata-based freshness in batch, if possible '
time: 2024-04-04T17:07:28.717868-07:00
custom:
Author: michelleark
Issue: "8705"
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20240409-233347.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Begin warning people about spaces in model names
time: 2024-04-09T23:33:47.850166-07:00
custom:
Author: QMalcolm
Issue: "9397"
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20240412-095718.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Disambiguiate FreshnessConfigProblem error message
time: 2024-04-12T09:57:18.417882-07:00
custom:
Author: michelleark
Issue: "9891"
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20240412-134502.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Migrate to using `error_tag` provided by `dbt-common`
time: 2024-04-12T13:45:02.879023-07:00
custom:
Author: QMalcolm
Issue: "9914"
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ jobs:
- name: Install source distributions
# ignore dbt-1.0.0, which intentionally raises an error when installed from source
run: |
find ./dist/dbt-[a-z]*.gz -maxdepth 1 -type f | xargs python -m pip install --force-reinstall --find-links=dist/
find ./dist/*.gz -maxdepth 1 -type f | xargs python -m pip install --force-reinstall --find-links=dist/
- name: Check source distributions
run: |
Expand Down
6 changes: 5 additions & 1 deletion core/dbt/contracts/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ def validate(cls, data):

@dataclass
class ProjectFlags(ExtensibleDbtClassMixin):
allow_spaces_in_model_names: Optional[bool] = True
cache_selected_only: Optional[bool] = None
debug: Optional[bool] = None
fail_fast: Optional[bool] = None
Expand All @@ -320,7 +321,10 @@ class ProjectFlags(ExtensibleDbtClassMixin):

@property
def project_only_flags(self) -> Dict[str, Any]:
return {"source_freshness_run_project_hooks": self.source_freshness_run_project_hooks}
return {
"source_freshness_run_project_hooks": self.source_freshness_run_project_hooks,
"allow_spaces_in_model_names": self.allow_spaces_in_model_names,
}


@dataclass
Expand Down
24 changes: 24 additions & 0 deletions core/dbt/events/core_types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,30 @@ message ProjectFlagsMovedDeprecationMsg {
ProjectFlagsMovedDeprecation data = 2;
}

// D014
message SpacesInModelNameDeprecation {
string model_name = 1;
string model_version = 2;
string level = 3;
}

message SpacesInModelNameDeprecationMsg {
CoreEventInfo info = 1;
SpacesInModelNameDeprecation data = 2;
}

// D015
message TotalModelNamesWithSpacesDeprecation {
int32 count_invalid_names = 1;
bool show_debug_hint = 2;
string level = 3;
}

message TotalModelNamesWithSpacesDeprecationMsg {
CoreEventInfo info = 1;
TotalModelNamesWithSpacesDeprecation data = 2;
}

// I065
message DeprecatedModel {
string model_name = 1;
Expand Down
1,146 changes: 577 additions & 569 deletions core/dbt/events/core_types_pb2.py

Large diffs are not rendered by default.

39 changes: 38 additions & 1 deletion core/dbt/events/types.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json

from dbt.constants import MAXIMUM_SEED_SIZE_NAME, PIN_PACKAGE_URL
from dbt_common.ui import warning_tag, line_wrap_message, green, yellow, red
from dbt_common.ui import error_tag, warning_tag, line_wrap_message, green, yellow, red
from dbt_common.events.base_types import EventLevel
from dbt_common.events.format import (
format_fancy_output_line,
Expand Down Expand Up @@ -413,6 +413,43 @@ def message(self) -> str:
return warning_tag(f"Deprecated functionality\n\n{description}")


class SpacesInModelNameDeprecation(DynamicLevel):
def code(self) -> str:
return "D014"

def message(self) -> str:
version = ".v" + self.model_version if self.model_version else ""
description = (
f"Model `{self.model_name}{version}` has spaces in its name. This is deprecated and "
"may cause errors when using dbt."
)

if self.level == EventLevel.ERROR.value:
description = error_tag(description)
elif self.level == EventLevel.WARN.value:
description = warning_tag(description)

return line_wrap_message(description)


class TotalModelNamesWithSpacesDeprecation(DynamicLevel):
def code(self) -> str:
return "D015"

def message(self) -> str:
description = f"Spaces in model names found in {self.count_invalid_names} model(s), which is deprecated."

if self.show_debug_hint:
description += " Run again with `--debug` to see them all."

if self.level == EventLevel.ERROR.value:
description = error_tag(description)
elif self.level == EventLevel.WARN.value:
description = warning_tag(description)

return line_wrap_message(description)


# =======================================================
# I - Project parsing
# =======================================================
Expand Down
45 changes: 45 additions & 0 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from dbt.context.manifest import generate_query_header_context
from dbt.contracts.graph.semantic_manifest import SemanticManifest
from dbt_common.events.base_types import EventLevel
from dbt_common.exceptions.base import DbtValidationError
import dbt_common.utils
import json
import pprint
Expand Down Expand Up @@ -62,6 +63,8 @@
StateCheckVarsHash,
DeprecatedModel,
DeprecatedReference,
SpacesInModelNameDeprecation,
TotalModelNamesWithSpacesDeprecation,
UpcomingReferenceDeprecation,
)
from dbt.logger import DbtProcessState
Expand Down Expand Up @@ -520,6 +523,7 @@ def load(self) -> Manifest:
self.write_manifest_for_partial_parse()

self.check_for_model_deprecations()
self.check_for_spaces_in_model_names()

return self.manifest

Expand Down Expand Up @@ -621,6 +625,47 @@ def check_for_model_deprecations(self):
)
)

def check_for_spaces_in_model_names(self):
"""Validates that model names do not contain spaces
If `DEBUG` flag is `False`, logs only first bad model name
If `DEBUG` flag is `True`, logs every bad model name
If `ALLOW_SPACES_IN_MODEL_NAMES` is `False`, logs are `ERROR` level and an exception is raised if any names are bad
If `ALLOW_SPACES_IN_MODEL_NAMES` is `True`, logs are `WARN` level
"""
improper_model_names = 0
level = (
EventLevel.WARN
if self.root_project.args.ALLOW_SPACES_IN_MODEL_NAMES
else EventLevel.ERROR
)

for node in self.manifest.nodes.values():
if isinstance(node, ModelNode) and " " in node.name:
if improper_model_names == 0 or self.root_project.args.DEBUG:
fire_event(
SpacesInModelNameDeprecation(
model_name=node.name,
model_version=version_to_str(node.version),
level=level.value,
),
level=level,
)
improper_model_names += 1

if improper_model_names > 0:
fire_event(
TotalModelNamesWithSpacesDeprecation(
count_invalid_names=improper_model_names,
show_debug_hint=(not self.root_project.args.DEBUG),
level=level.value,
),
level=level,
)

if level == EventLevel.ERROR:
raise DbtValidationError("Model names cannot contain spaces")

def load_and_parse_macros(self, project_parser_files):
for project in self.all_projects.values():
if project.project_name not in project_parser_files:
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/parser/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def parse_source(self, target: UnpatchedSourceDefinition) -> SourceDefinition:
# runtime.
fire_event(
FreshnessConfigProblem(
msg=f"The configured adapter does not support metadata-based freshness. A loaded_at_field must be specified for source '{source.name}'."
msg=f"The configured adapter does not support metadata-based freshness. A loaded_at_field must be specified for source '{source.name}.{table.name}'."
)
)

Expand Down
79 changes: 73 additions & 6 deletions core/dbt/task/freshness.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import threading
import time
from typing import Optional, List
from typing import Optional, List, AbstractSet, Dict

from .base import BaseRunner
from .printer import (
Expand All @@ -28,6 +28,8 @@

from dbt.adapters.capability import Capability
from dbt.adapters.contracts.connection import AdapterResponse
from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.base.impl import FreshnessResponse
from dbt.contracts.graph.nodes import SourceDefinition, HookNode
from dbt_common.events.base_types import EventLevel
from dbt.graph import ResourceTypeSelector
Expand All @@ -36,6 +38,15 @@


class FreshnessRunner(BaseRunner):
def __init__(self, config, adapter, node, node_index, num_nodes) -> None:
super().__init__(config, adapter, node, node_index, num_nodes)
self._metadata_freshness_cache: Dict[BaseRelation, FreshnessResult] = {}

def set_metadata_freshness_cache(
self, metadata_freshness_cache: Dict[BaseRelation, FreshnessResult]
) -> None:
self._metadata_freshness_cache = metadata_freshness_cache

def on_skip(self):
raise DbtRuntimeError("Freshness: nodes cannot be skipped!")

Expand Down Expand Up @@ -105,7 +116,7 @@ def execute(self, compiled_node, manifest):
with self.adapter.connection_named(compiled_node.unique_id, compiled_node):
self.adapter.clear_transaction()
adapter_response: Optional[AdapterResponse] = None
freshness = None
freshness: Optional[FreshnessResponse] = None

if compiled_node.loaded_at_field is not None:
adapter_response, freshness = self.adapter.calculate_freshness(
Expand All @@ -125,10 +136,14 @@ def execute(self, compiled_node, manifest):
EventLevel.WARN,
)

adapter_response, freshness = self.adapter.calculate_freshness_from_metadata(
relation,
macro_resolver=manifest,
)
metadata_source = self.adapter.Relation.create_from(self.config, compiled_node)
if metadata_source in self._metadata_freshness_cache:
freshness = self._metadata_freshness_cache[metadata_source]
else:
adapter_response, freshness = self.adapter.calculate_freshness_from_metadata(
relation,
macro_resolver=manifest,
)

status = compiled_node.freshness.status(freshness["age"])
else:
Expand Down Expand Up @@ -171,6 +186,10 @@ def node_is_match(self, node):


class FreshnessTask(RunTask):
def __init__(self, args, config, manifest) -> None:
super().__init__(args, config, manifest)
self._metadata_freshness_cache: Dict[BaseRelation, FreshnessResult] = {}

def result_path(self):
if self.args.output:
return os.path.realpath(self.args.output)
Expand All @@ -190,6 +209,17 @@ def get_node_selector(self):
resource_types=[NodeType.Source],
)

def before_run(self, adapter, selected_uids: AbstractSet[str]) -> None:
super().before_run(adapter, selected_uids)
if adapter.supports(Capability.TableLastModifiedMetadataBatch):
self.populate_metadata_freshness_cache(adapter, selected_uids)

def get_runner(self, node) -> BaseRunner:
freshness_runner = super().get_runner(node)
assert isinstance(freshness_runner, FreshnessRunner)
freshness_runner.set_metadata_freshness_cache(self._metadata_freshness_cache)
return freshness_runner

def get_runner_type(self, _):
return FreshnessRunner

Expand All @@ -214,3 +244,40 @@ def get_hooks_by_type(self, hook_type: RunHookType) -> List[HookNode]:
return super().get_hooks_by_type(hook_type)
else:
return []

def populate_metadata_freshness_cache(self, adapter, selected_uids: AbstractSet[str]) -> None:
if self.manifest is None:
raise DbtInternalError("Manifest must be set to populate metadata freshness cache")

batch_metadata_sources: List[BaseRelation] = []
for selected_source_uid in list(selected_uids):
source = self.manifest.sources.get(selected_source_uid)
if source and source.loaded_at_field is None:
metadata_source = adapter.Relation.create_from(self.config, source)
batch_metadata_sources.append(metadata_source)

fire_event(
Note(
msg=f"Pulling freshness from warehouse metadata tables for {len(batch_metadata_sources)} sources"
),
EventLevel.INFO,
)

try:
_, metadata_freshness_results = adapter.calculate_freshness_from_metadata_batch(
batch_metadata_sources
)
self._metadata_freshness_cache.update(metadata_freshness_results)
except Exception as e:
# This error handling is intentionally very coarse.
# If anything goes wrong during batch metadata calculation, we can safely
# leave _metadata_freshness_cache unpopulated.
# Downstream, this will be gracefully handled as a cache miss and non-batch
# metadata-based freshness will still be performed on a source-by-source basis.
fire_event(
Note(msg=f"Metadata freshness could not be computed in batch: {e}"),
EventLevel.WARN,
)

def get_freshness_metadata_cache(self) -> Dict[BaseRelation, FreshnessResult]:
return self._metadata_freshness_cache
2 changes: 1 addition & 1 deletion core/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
"minimal-snowplow-tracker>=0.0.2,<0.1",
"dbt-semantic-interfaces>=0.5.1,<0.6",
# Minor versions for these are expected to be backwards-compatible
"dbt-common<2.0",
"dbt-common>=1.0.1,<2.0",
"dbt-adapters>=0.1.0a2,<2.0",
# ----
# Expect compatibility with all new versions of these packages, so lower bounds only.
Expand Down
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
git+https://github.com/dbt-labs/dbt-adapters.git@main
git+https://github.com/dbt-labs/dbt-adapters.git
git+https://github.com/dbt-labs/dbt-adapters.git@main#subdirectory=dbt-tests-adapter
git+https://github.com/dbt-labs/dbt-common.git@main
git+https://github.com/dbt-labs/dbt-postgres.git@main
Expand Down
Loading

0 comments on commit 02a545c

Please sign in to comment.