Skip to content

Commit

Permalink
Add support for Group by checks (#1840)
Browse files Browse the repository at this point in the history
Here's the syntax for a group by check: this will result in a check for every group value (marital_status)

```yaml
 - group by:
      query: |
        SELECT marital_status, AVG(vacation_hours) as vacation_hours
        FROM dim_employee
        GROUP BY marital_status
      fields:
        - marital_status
      checks:
        - vacation_hours:
            fail: when > 60
            name: Average vacation hours
```
  • Loading branch information
vijaykiran authored Mar 21, 2023
1 parent a6c9dad commit 6a522a9
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 35 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ scripts/antlr*.jar
__pycache__/
*.py[cod]
*$py.class
pyrightconfig.json

# C extensions
*.so
Expand Down
2 changes: 1 addition & 1 deletion soda/core/soda/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def scan(
data_timestamp: str,
variable: list[str],
verbose: bool | None,
scan_results_file: Optional[str] = None,
scan_results_file: str | None = None,
):
"""
The soda scan command:
Expand Down
50 changes: 29 additions & 21 deletions soda/core/soda/execution/check/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from soda.common.attributes_handler import AttributeHandler
from soda.execution.check_outcome import CheckOutcome
from soda.execution.check_type import CheckType
from soda.execution.column import Column
from soda.execution.identity import ConsistentHashBuilder
from soda.execution.metric.metric import Metric
Expand Down Expand Up @@ -135,6 +136,11 @@ def __init__(
self.outcome_reasons: list[dict] = []
self.force_send_results_to_cloud = False

# Default check type is Cloud, when this is set to CheckType.LOCAL, the check will not be sent to cloud
self.check_type = CheckType.CLOUD

self.cloud_dict = {}

@property
def name(self) -> str:
"""User readable name.
Expand Down Expand Up @@ -231,32 +237,34 @@ def get_cloud_dict(self):
from soda.execution.column import Column
from soda.execution.partition import Partition

cloud_dict = {
# See https://sodadata.atlassian.net/browse/CLOUD-1143
"identity": self.create_identity(with_datasource=True, with_filename=True),
"identities": self.create_identities(),
"name": self.name,
"type": self.cloud_check_type,
"definition": self.create_definition(),
"resourceAttributes": self._format_attributes(),
"location": self.check_cfg.location.get_cloud_dict(),
"dataSource": self.data_source_scan.data_source.data_source_name,
"table": Partition.get_table_name(self.partition),
# "filter": Partition.get_partition_name(self.partition), TODO: re-enable once backend supports the property.
"column": Column.get_partition_name(self.column),
"metrics": [metric.identity for metric in self.metrics.values()],
"outcome": self.outcome.value if self.outcome else None,
"diagnostics": self.get_cloud_diagnostics_dict(),
"source": "soda-core",
}
self.cloud_dict.update(
{
# See https://sodadata.atlassian.net/browse/CLOUD-1143
"identity": self.create_identity(with_datasource=True, with_filename=True),
"identities": self.create_identities(),
"name": self.name,
"type": self.cloud_check_type,
"definition": self.create_definition(),
"resourceAttributes": self._format_attributes(),
"location": self.check_cfg.location.get_cloud_dict(),
"dataSource": self.data_source_scan.data_source.data_source_name,
"table": Partition.get_table_name(self.partition),
# "filter": Partition.get_partition_name(self.partition), TODO: re-enable once backend supports the property.
"column": Column.get_partition_name(self.column),
"metrics": [metric.identity for metric in self.metrics.values()],
"outcome": self.outcome.value if self.outcome else None,
"diagnostics": self.get_cloud_diagnostics_dict(),
"source": "soda-core",
}
)
# Update dict if automated monitoring is running
if self.archetype is not None:
cloud_dict.update({"archetype": self.archetype})
self.cloud_dict.update({"archetype": self.archetype})

# Update dict if check is skipped and we want to push reason to cloud
if self.outcome_reasons:
cloud_dict.update({"outcomeReasons": self.outcome_reasons})
return cloud_dict
self.cloud_dict.update({"outcomeReasons": self.outcome_reasons})
return self.cloud_dict

def get_dict(self):
from soda.execution.column import Column
Expand Down
33 changes: 25 additions & 8 deletions soda/core/soda/execution/check/group_by_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import copy

from soda.execution.check.check import Check
from soda.execution.check.metric_check import MetricCheck
from soda.execution.check_outcome import CheckOutcome
from soda.execution.check_type import CheckType
from soda.execution.metric.metric import Metric
from soda.execution.partition import Partition

Expand All @@ -24,17 +24,17 @@ def __init__(
partition=partition,
column=None,
)
from soda.sodacl.group_by_check_cfg import GroupByCheckCfg

check_cfg: GroupByCheckCfg = self.check_cfg
self.check_value = None
self.check_type = CheckType.LOCAL

from soda.execution.metric.group_by_metric import GroupByMetric

group_by_metric = data_source_scan.resolve_metric(
GroupByMetric(
data_source_scan=self.data_source_scan,
partition=partition,
query=check_cfg.query,
query=self.check_cfg.query,
check=self,
)
)
Expand All @@ -56,10 +56,14 @@ def evaluate(self, metrics: dict[str, Metric], historic_values: dict[str, object
group_checks = []
for group in groups:
for gcc in group_check_cfgs:
config = copy.copy(gcc)
config.name = gcc.name + f" [{','.join(str(v) for v in group)}]"
group_name = f"{','.join(str(v) for v in group)}"
config = copy.deepcopy(gcc)
config.name = gcc.name + f" [{group_name}]"
config.source_configurations["group_value"] = f"[{group_name}]"
column = ",".join(fields)
gc = MetricCheck(config, self.data_source_scan, partition=self.partition, column=column)
gc = Check.create(
check_cfg=config, data_source_scan=self.data_source_scan, partition=self.partition, column=column
)
result = next(filter(lambda qr: tuple(map(qr.get, fields)) == group, query_results))
if result is not None:
gc.check_value = result[config.metric_name]
Expand All @@ -71,14 +75,27 @@ def evaluate(self, metrics: dict[str, Metric], historic_values: dict[str, object
check=None,
identity_parts=[],
)

# TODO fetch historic values, change over time checks will not work yet
# historic_values = {}
# if gc.historic_descriptors:
# for hd_key, hd in gc.historic_descriptors.items():
# print(f"hd_key: {hd_key}, hd: {hd}")
# historic_values[hd_key] = self.data_source_scan.scan.__get_historic_data_from_soda_cloud_metric_store(hd)

metric.set_value(gc.check_value)
self.data_source_scan.scan._add_metric(metric)
gc.metrics = {config.metric_name: metric}
gc.evaluate(metrics=None, historic_values=None)

cloud_group_attr = {
"group": {"identity": self.create_identity(), "name": gcc.name, "distinctLabel": group_name}
}
gc.cloud_dict.update(cloud_group_attr)
group_checks.append(gc)

self.data_source_scan.scan._checks.extend(group_checks)

# TODO decide what to do with global check state
if all(gc.outcome == CheckOutcome.PASS for gc in group_checks):
self.outcome = CheckOutcome.PASS
elif any(gc.outcome == CheckOutcome.FAIL for gc in group_checks):
Expand Down
9 changes: 9 additions & 0 deletions soda/core/soda/execution/check_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from enum import Enum


class CheckType(Enum):
CLOUD = 1
LOCAL = 2

def __str__(self):
return self.name
11 changes: 7 additions & 4 deletions soda/core/soda/soda_cloud/soda_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from soda.__version__ import SODA_CORE_VERSION
from soda.common.json_helper import JsonHelper
from soda.common.logs import Logs
from soda.execution.check_type import CheckType
from soda.soda_cloud.historic_descriptor import (
HistoricChangeOverTimeDescriptor,
HistoricCheckResultsDescriptor,
Expand Down Expand Up @@ -71,12 +72,14 @@ def build_scan_results(scan) -> dict:
checks = [
check.get_cloud_dict()
for check in scan._checks
if (check.outcome is not None or check.force_send_results_to_cloud == True) and check.archetype is None
if check.check_type == CheckType.CLOUD
and (check.outcome is not None or check.force_send_results_to_cloud is True)
and check.archetype is None
]
automated_monitoring_checks = [
check.get_cloud_dict()
for check in scan._checks
if (check.outcome is not None or check.force_send_results_to_cloud == True) and check.archetype is not None
if (check.outcome is not None or check.force_send_results_to_cloud is True) and check.archetype is not None
]

# TODO: [SODA-608] separate profile columns and sample tables by aligning with the backend team
Expand Down Expand Up @@ -162,7 +165,7 @@ def upload_sample(
self.logs.error(f"Soda cloud error: Could not upload sample {sample_file_name}", exception=e)

def _fileify(self, name: str):
return re.sub(r"[^A-Za-z0-9_]+", "_", name).lower()
return re.sub(r"\W+", "_", name).lower()

def _upload_sample_http(self, scan_definition_name: str, file_path, temp_file, file_size_in_bytes: int):
headers = {
Expand Down Expand Up @@ -338,7 +341,7 @@ def _execute_request(self, request_type: str, request_body: dict, is_retry: bool
)
response_json = response.json()
if response.status_code == 401 and not is_retry:
logger.debug(f"Authentication failed. Probably token expired. Re-authenticating...")
logger.debug("Authentication failed. Probably token expired. Re-authenticating...")
self.token = None
response_json = self._execute_request(request_type, request_body, True, request_name)
elif response.status_code != 200:
Expand Down
2 changes: 1 addition & 1 deletion soda/core/soda/sodacl/sodacl_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ def parse_group_by_cfg(self, check_configurations, check_str, header_str):
name = self._get_optional(NAME, str)

try:
group_limit = self._get_optional("group_limit", int)
group_limit = self._get_optional("group_limit", int) or 1000
query = self._get_required("query", str)
fields = self._get_required("fields", list)
check_cfgs = self._get_required("checks", list)
Expand Down

0 comments on commit 6a522a9

Please sign in to comment.