From 6a522a9d16ba9593a932161df24ff17efab2f03b Mon Sep 17 00:00:00 2001 From: Vijay Kiran Date: Tue, 21 Mar 2023 13:38:19 +0100 Subject: [PATCH] Add support for Group by checks (#1840) Here's the syntax for a group by check: this will result in a check for every group value (marital_status) ```yaml - group by: query: | SELECT marital_status, AVG(vacation_hours) as vacation_hours FROM dim_employee GROUP BY marital_status fields: - marital_status checks: - vacation_hours: fail: when > 60 name: Average vacation hours ``` --- .gitignore | 1 + soda/core/soda/cli/cli.py | 2 +- soda/core/soda/execution/check/check.py | 50 +++++++++++-------- .../soda/execution/check/group_by_check.py | 33 +++++++++--- soda/core/soda/execution/check_type.py | 9 ++++ soda/core/soda/soda_cloud/soda_cloud.py | 11 ++-- soda/core/soda/sodacl/sodacl_parser.py | 2 +- 7 files changed, 73 insertions(+), 35 deletions(-) create mode 100644 soda/core/soda/execution/check_type.py diff --git a/.gitignore b/.gitignore index 7d75ec456..62e681b70 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,7 @@ scripts/antlr*.jar __pycache__/ *.py[cod] *$py.class +pyrightconfig.json # C extensions *.so diff --git a/soda/core/soda/cli/cli.py b/soda/core/soda/cli/cli.py index a31da5093..6caa57c49 100644 --- a/soda/core/soda/cli/cli.py +++ b/soda/core/soda/cli/cli.py @@ -94,7 +94,7 @@ def scan( data_timestamp: str, variable: list[str], verbose: bool | None, - scan_results_file: Optional[str] = None, + scan_results_file: str | None = None, ): """ The soda scan command: diff --git a/soda/core/soda/execution/check/check.py b/soda/core/soda/execution/check/check.py index 53e19e06d..889755d08 100644 --- a/soda/core/soda/execution/check/check.py +++ b/soda/core/soda/execution/check/check.py @@ -6,6 +6,7 @@ from soda.common.attributes_handler import AttributeHandler from soda.execution.check_outcome import CheckOutcome +from soda.execution.check_type import CheckType from soda.execution.column import Column from soda.execution.identity import ConsistentHashBuilder from soda.execution.metric.metric import Metric @@ -135,6 +136,11 @@ def __init__( self.outcome_reasons: list[dict] = [] self.force_send_results_to_cloud = False + # Default check type is Cloud, when this is set to CheckType.LOCAL, the check will not be sent to cloud + self.check_type = CheckType.CLOUD + + self.cloud_dict = {} + @property def name(self) -> str: """User readable name. @@ -231,32 +237,34 @@ def get_cloud_dict(self): from soda.execution.column import Column from soda.execution.partition import Partition - cloud_dict = { - # See https://sodadata.atlassian.net/browse/CLOUD-1143 - "identity": self.create_identity(with_datasource=True, with_filename=True), - "identities": self.create_identities(), - "name": self.name, - "type": self.cloud_check_type, - "definition": self.create_definition(), - "resourceAttributes": self._format_attributes(), - "location": self.check_cfg.location.get_cloud_dict(), - "dataSource": self.data_source_scan.data_source.data_source_name, - "table": Partition.get_table_name(self.partition), - # "filter": Partition.get_partition_name(self.partition), TODO: re-enable once backend supports the property. - "column": Column.get_partition_name(self.column), - "metrics": [metric.identity for metric in self.metrics.values()], - "outcome": self.outcome.value if self.outcome else None, - "diagnostics": self.get_cloud_diagnostics_dict(), - "source": "soda-core", - } + self.cloud_dict.update( + { + # See https://sodadata.atlassian.net/browse/CLOUD-1143 + "identity": self.create_identity(with_datasource=True, with_filename=True), + "identities": self.create_identities(), + "name": self.name, + "type": self.cloud_check_type, + "definition": self.create_definition(), + "resourceAttributes": self._format_attributes(), + "location": self.check_cfg.location.get_cloud_dict(), + "dataSource": self.data_source_scan.data_source.data_source_name, + "table": Partition.get_table_name(self.partition), + # "filter": Partition.get_partition_name(self.partition), TODO: re-enable once backend supports the property. + "column": Column.get_partition_name(self.column), + "metrics": [metric.identity for metric in self.metrics.values()], + "outcome": self.outcome.value if self.outcome else None, + "diagnostics": self.get_cloud_diagnostics_dict(), + "source": "soda-core", + } + ) # Update dict if automated monitoring is running if self.archetype is not None: - cloud_dict.update({"archetype": self.archetype}) + self.cloud_dict.update({"archetype": self.archetype}) # Update dict if check is skipped and we want to push reason to cloud if self.outcome_reasons: - cloud_dict.update({"outcomeReasons": self.outcome_reasons}) - return cloud_dict + self.cloud_dict.update({"outcomeReasons": self.outcome_reasons}) + return self.cloud_dict def get_dict(self): from soda.execution.column import Column diff --git a/soda/core/soda/execution/check/group_by_check.py b/soda/core/soda/execution/check/group_by_check.py index dfcd03979..dfb79ee12 100644 --- a/soda/core/soda/execution/check/group_by_check.py +++ b/soda/core/soda/execution/check/group_by_check.py @@ -3,8 +3,8 @@ import copy from soda.execution.check.check import Check -from soda.execution.check.metric_check import MetricCheck from soda.execution.check_outcome import CheckOutcome +from soda.execution.check_type import CheckType from soda.execution.metric.metric import Metric from soda.execution.partition import Partition @@ -24,17 +24,17 @@ def __init__( partition=partition, column=None, ) - from soda.sodacl.group_by_check_cfg import GroupByCheckCfg - check_cfg: GroupByCheckCfg = self.check_cfg self.check_value = None + self.check_type = CheckType.LOCAL + from soda.execution.metric.group_by_metric import GroupByMetric group_by_metric = data_source_scan.resolve_metric( GroupByMetric( data_source_scan=self.data_source_scan, partition=partition, - query=check_cfg.query, + query=self.check_cfg.query, check=self, ) ) @@ -56,10 +56,14 @@ def evaluate(self, metrics: dict[str, Metric], historic_values: dict[str, object group_checks = [] for group in groups: for gcc in group_check_cfgs: - config = copy.copy(gcc) - config.name = gcc.name + f" [{','.join(str(v) for v in group)}]" + group_name = f"{','.join(str(v) for v in group)}" + config = copy.deepcopy(gcc) + config.name = gcc.name + f" [{group_name}]" + config.source_configurations["group_value"] = f"[{group_name}]" column = ",".join(fields) - gc = MetricCheck(config, self.data_source_scan, partition=self.partition, column=column) + gc = Check.create( + check_cfg=config, data_source_scan=self.data_source_scan, partition=self.partition, column=column + ) result = next(filter(lambda qr: tuple(map(qr.get, fields)) == group, query_results)) if result is not None: gc.check_value = result[config.metric_name] @@ -71,14 +75,27 @@ def evaluate(self, metrics: dict[str, Metric], historic_values: dict[str, object check=None, identity_parts=[], ) + + # TODO fetch historic values, change over time checks will not work yet + # historic_values = {} + # if gc.historic_descriptors: + # for hd_key, hd in gc.historic_descriptors.items(): + # print(f"hd_key: {hd_key}, hd: {hd}") + # historic_values[hd_key] = self.data_source_scan.scan.__get_historic_data_from_soda_cloud_metric_store(hd) + metric.set_value(gc.check_value) + self.data_source_scan.scan._add_metric(metric) gc.metrics = {config.metric_name: metric} gc.evaluate(metrics=None, historic_values=None) + + cloud_group_attr = { + "group": {"identity": self.create_identity(), "name": gcc.name, "distinctLabel": group_name} + } + gc.cloud_dict.update(cloud_group_attr) group_checks.append(gc) self.data_source_scan.scan._checks.extend(group_checks) - # TODO decide what to do with global check state if all(gc.outcome == CheckOutcome.PASS for gc in group_checks): self.outcome = CheckOutcome.PASS elif any(gc.outcome == CheckOutcome.FAIL for gc in group_checks): diff --git a/soda/core/soda/execution/check_type.py b/soda/core/soda/execution/check_type.py new file mode 100644 index 000000000..e3995fe86 --- /dev/null +++ b/soda/core/soda/execution/check_type.py @@ -0,0 +1,9 @@ +from enum import Enum + + +class CheckType(Enum): + CLOUD = 1 + LOCAL = 2 + + def __str__(self): + return self.name diff --git a/soda/core/soda/soda_cloud/soda_cloud.py b/soda/core/soda/soda_cloud/soda_cloud.py index 201c48741..5eea75e12 100644 --- a/soda/core/soda/soda_cloud/soda_cloud.py +++ b/soda/core/soda/soda_cloud/soda_cloud.py @@ -12,6 +12,7 @@ from soda.__version__ import SODA_CORE_VERSION from soda.common.json_helper import JsonHelper from soda.common.logs import Logs +from soda.execution.check_type import CheckType from soda.soda_cloud.historic_descriptor import ( HistoricChangeOverTimeDescriptor, HistoricCheckResultsDescriptor, @@ -71,12 +72,14 @@ def build_scan_results(scan) -> dict: checks = [ check.get_cloud_dict() for check in scan._checks - if (check.outcome is not None or check.force_send_results_to_cloud == True) and check.archetype is None + if check.check_type == CheckType.CLOUD + and (check.outcome is not None or check.force_send_results_to_cloud is True) + and check.archetype is None ] automated_monitoring_checks = [ check.get_cloud_dict() for check in scan._checks - if (check.outcome is not None or check.force_send_results_to_cloud == True) and check.archetype is not None + if (check.outcome is not None or check.force_send_results_to_cloud is True) and check.archetype is not None ] # TODO: [SODA-608] separate profile columns and sample tables by aligning with the backend team @@ -162,7 +165,7 @@ def upload_sample( self.logs.error(f"Soda cloud error: Could not upload sample {sample_file_name}", exception=e) def _fileify(self, name: str): - return re.sub(r"[^A-Za-z0-9_]+", "_", name).lower() + return re.sub(r"\W+", "_", name).lower() def _upload_sample_http(self, scan_definition_name: str, file_path, temp_file, file_size_in_bytes: int): headers = { @@ -338,7 +341,7 @@ def _execute_request(self, request_type: str, request_body: dict, is_retry: bool ) response_json = response.json() if response.status_code == 401 and not is_retry: - logger.debug(f"Authentication failed. Probably token expired. Re-authenticating...") + logger.debug("Authentication failed. Probably token expired. Re-authenticating...") self.token = None response_json = self._execute_request(request_type, request_body, True, request_name) elif response.status_code != 200: diff --git a/soda/core/soda/sodacl/sodacl_parser.py b/soda/core/soda/sodacl/sodacl_parser.py index b34fc7653..a3ba9f6d8 100644 --- a/soda/core/soda/sodacl/sodacl_parser.py +++ b/soda/core/soda/sodacl/sodacl_parser.py @@ -335,7 +335,7 @@ def parse_group_by_cfg(self, check_configurations, check_str, header_str): name = self._get_optional(NAME, str) try: - group_limit = self._get_optional("group_limit", int) + group_limit = self._get_optional("group_limit", int) or 1000 query = self._get_required("query", str) fields = self._get_required("fields", list) check_cfgs = self._get_required("checks", list)