Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync main from soda-core #9

Open
wants to merge 112 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
112 commits
Select commit Hold shift + click to select a range
5164d34
Catch exceptions while building results file (#1936)
m1n0 Sep 13, 2023
71dfe19
[pre-commit.ci] pre-commit autoupdate (#1935)
pre-commit-ci[bot] Sep 14, 2023
8fa452e
Reference check: support must NOT exist (#1937)
m1n0 Sep 18, 2023
995b4ac
Bump to 3.0.49
m1n0 Sep 19, 2023
67597f2
Add thresholds and diagnostics to scan result (#1939)
m1n0 Sep 21, 2023
8e74d93
Fix databricks numeric types profiling (#1941)
m1n0 Sep 27, 2023
67111aa
Bump to 3.0.50
m1n0 Sep 27, 2023
f743fc7
Allow to specify virtual file name for add sodacl string (#1943)
m1n0 Oct 2, 2023
3fdac3c
Feature/add more file formats for duckdb (#1942)
PaoloLeonard Oct 6, 2023
b34e271
added BigQuery Job Labels (#1947)
m1n0 Oct 10, 2023
d25316f
Bump to 3.0.51
m1n0 Oct 11, 2023
2f67adb
Distribution: compute value counts in DB rather than in python
baturayo Oct 13, 2023
fe27fc3
Fix 3.8 compatibility
m1n0 Oct 17, 2023
431a0ee
feat: Add Dask/Pandas configurable data source naming support (#1951)
dirkgroenen Oct 25, 2023
5312c43
Bump to 3.0.52
dirkgroenen Oct 25, 2023
f6505f0
Freshness: support mixed thresholds (#1957)
m1n0 Oct 31, 2023
7affe19
Add License to every package (#1958)
m1n0 Nov 1, 2023
b3c112e
Bump to 3.0.53
m1n0 Nov 1, 2023
2c9cde9
Failed rows check: support thresholds (#1960)
m1n0 Nov 3, 2023
59191bf
Updated install doc to include MotherDuck support via DuckDB (#1963)
janet-can Nov 7, 2023
c7182b1
remove % from pattern (#1956)
chuwangBA Nov 9, 2023
7505aa3
Sqlserver: support quoting tables with brackets, "quote_tables" mode …
m1n0 Nov 14, 2023
644546d
Bump to 3.0.54
m1n0 Nov 14, 2023
5f268b8
Contracts
tombaeyens Nov 15, 2023
6ffddd9
Fix check source payload (#1966)
m1n0 Nov 15, 2023
2a142e7
Bump to 3.1.0
m1n0 Nov 16, 2023
3f8fcc7
Update python api docs (#1967)
m1n0 Nov 16, 2023
88640a9
Make custom identity fixed as v4 (#1968)
m1n0 Nov 20, 2023
09c00a2
Freshness: support in-check filters (#1970)
m1n0 Dec 1, 2023
ae8d325
Bump to 3.1.1
m1n0 Dec 2, 2023
8249949
Adding support for authentication via a chained list of delegate acco…
nathadfield Dec 15, 2023
17c67cf
fix anomaly detection frequency aggregation bug (#1975)
baturayo Dec 15, 2023
46206eb
upgrade pydantic from v1 to v2 (#1974)
baturayo Dec 15, 2023
cb950c9
[pre-commit.ci] pre-commit autoupdate (#1938)
pre-commit-ci[bot] Dec 15, 2023
b7103e1
Bump to 3.1.2
m1n0 Dec 15, 2023
e80f118
feat: implement warn_only for anomaly score (#156) (#1980)
baturayo Dec 27, 2023
3c05346
Bump to 3.1.3
m1n0 Jan 3, 2024
1a44ce0
Dbt: improve parsing logs (#1981)
m1n0 Jan 4, 2024
2bde90c
Sampler: fix link href (#1983)
m1n0 Jan 5, 2024
c3c9521
Document group by example for Soda Core with failed rows check (#1984)
janet-can Jan 5, 2024
45a5a74
Schema check: support custom identity (#1988)
m1n0 Jan 16, 2024
34d65af
Add semver release with major, minor, latest (#1993)
dirkgroenen Jan 23, 2024
036204b
bug: handle null values for continuous dist (#165) (#1994)
baturayo Jan 23, 2024
55b85f5
[pre-commit.ci] pre-commit autoupdate (#1977)
pre-commit-ci[bot] Jan 23, 2024
ceab226
feat: implement new anomaly detection in soda core (#1995)
baturayo Jan 24, 2024
9445d1e
feat: support built-in prophet public holidays (#1997)
baturayo Jan 24, 2024
64bc338
Bump to 3.1.4
m1n0 Jan 24, 2024
b6f4329
Hive data source improvements (#1982)
robertomorandeira Jan 24, 2024
79b513a
feat: implement migrate from anomaly score check config (#168) (#1998)
baturayo Jan 25, 2024
311f1f2
Bump Prophet (#2000)
m1n0 Jan 25, 2024
89da879
Tests: use approx comparison for floats (#1999)
m1n0 Jan 25, 2024
8e0ae62
hive: add configuration parameters (#36)
vijaykiran Jul 3, 2023
2d00558
Bump to 3.1.5
m1n0 Jan 26, 2024
594d026
feat: implement severity level paramaters (#2001)
baturayo Jan 29, 2024
339309f
Always use datasource specifis COUNT expression (#2003)
m1n0 Jan 29, 2024
51a30fb
fix: anomaly detection feedbacks (#2005)
baturayo Jan 31, 2024
70b8753
[pre-commit.ci] pre-commit autoupdate (#2002)
pre-commit-ci[bot] Feb 2, 2024
1d2e8ac
feat: anomaly detection simulator (#163) (#2010)
baturayo Feb 6, 2024
e172b7d
feat: added dremio token support (#2009)
JorisTruong Feb 7, 2024
fc8e191
Bump to 3.2.0
m1n0 Feb 8, 2024
68d44b3
feat: correctly identified anomalies are excluded from training data …
baturayo Feb 9, 2024
1a211f5
fix: show more clearly the detected frequency using warning message f…
baturayo Feb 9, 2024
16ea0b9
Fix simulator import and streamlit path (#2017)
m1n0 Feb 12, 2024
a02f463
[pre-commit.ci] pre-commit autoupdate (#2016)
pre-commit-ci[bot] Feb 13, 2024
2c3ce9d
Update oracle_data_source.py (#2012)
vinod901 Feb 13, 2024
eb2abf9
Oracle: cast config to str/int to prevent oracledb errors (#2018)
m1n0 Feb 13, 2024
dd63d9e
Bump to 3.2.1
m1n0 Feb 13, 2024
ea5831e
Fix assets folder (#2020)
m1n0 Feb 14, 2024
f47801c
fix timezone issue and log messages (#188) (#2023)
baturayo Feb 21, 2024
fe70d82
feat: in anomaly detection simulator use soda core historic check res…
baturayo Feb 28, 2024
7d2ed7b
Update dask-sql (#2026)
m1n0 Feb 29, 2024
f07eba9
Add dask-sql version comment
m1n0 Feb 29, 2024
97c3545
Bump to 3.2.2
m1n0 Feb 29, 2024
6245a4c
feat: implement daily and monthly seasonality to external regressor ……
baturayo Feb 29, 2024
b62550e
Dremio: fix token support (#2028)
m1n0 Mar 6, 2024
8179c50
Bump to 3.2.3
m1n0 Mar 6, 2024
8e41a2c
[pre-commit.ci] pre-commit autoupdate (#2022)
pre-commit-ci[bot] Mar 11, 2024
91dd60f
bugfix: support attributes on multiple checks (#2032)
milanaleksic Mar 12, 2024
e3787d1
Use dbt's new access_url pattern to access cloud API (#2035)
bastienboutonnet Mar 14, 2024
c25a872
Bump to 3.2.4
m1n0 Mar 16, 2024
98c52ce
Contracts 2nd iteration (#2006)
tombaeyens Mar 16, 2024
bd04e84
Bump to 3.3.0
m1n0 Mar 16, 2024
a1a2008
feat: improved wording and tooltip formatting in simulator (#2038)
bastienboutonnet Mar 19, 2024
c20eb59
Failed rows: fix warn/fail thresholds (#2042)
m1n0 Mar 22, 2024
de1d4b4
Bump opentelemetry to 1.22 (#2043)
m1n0 Mar 22, 2024
d4b8183
Bump dev requirements (#2045)
m1n0 Mar 23, 2024
ae33e9f
Bump to 3.3.1
m1n0 Mar 24, 2024
aee8045
Rename argument in set_scan_results_file method (#2047)
ozgenbaris1 Apr 9, 2024
2e40e45
Dremio: support disableCertificateVerification option (#2049)
m1n0 Apr 9, 2024
9e95906
[pre-commit.ci] pre-commit autoupdate (#2037)
pre-commit-ci[bot] Apr 16, 2024
1d21a34
Denodo: fix connection timeout attribute (#2065)
m1n0 Apr 23, 2024
34ace6a
Update db2_data_source.py (#2063)
4rahulae Apr 23, 2024
c046af0
Bump to 3.3.2
m1n0 Apr 24, 2024
76159ca
Update autoflake precommit (#2070)
m1n0 Apr 30, 2024
062b1e2
Contracts v3 (#2067)
tombaeyens Apr 30, 2024
5e51e69
Bump to 3.3.3
tombaeyens Apr 30, 2024
31b1ab3
Fix automated monitoring, prevent duplicate queries (#2075)
m1n0 May 3, 2024
cc02c01
Hive: support scheme (#2077)
m1n0 May 7, 2024
63c73f8
Bump dev requirements (#2078)
m1n0 May 7, 2024
7866d27
Bump deps (#2079)
m1n0 May 7, 2024
8a1ce04
Bump to 3.3.4
m1n0 May 7, 2024
1819347
Failed rows: fix warn/fail thresholds for fail condition (#2084)
m1n0 May 16, 2024
09262b0
upgrade to latest version of ibm-db python client (#2076)
Antoninj May 17, 2024
5d1163c
User defined metric fail query (#2089)
m1n0 May 23, 2024
b014718
Bump to 3.3.5
m1n0 May 23, 2024
4e09b27
CLOUD-7708 - Add Snowflake CI account to pipeline for soda-core (#2088)
dakue-soda May 27, 2024
5776b5e
[CLOUD-7400] Improve memory usage (#2081)
dirkgroenen May 29, 2024
c3dc141
lower pre-commit version to support py38
dirkgroenen May 30, 2024
7e631d5
Duplicate check: fail gracefully in case of error in query (#2093)
m1n0 Jun 5, 2024
552a716
Bump requests and tox/docker (#2094)
m1n0 Jun 5, 2024
af649b9
Duplicate check: support sample exclude columns fully (#2096)
m1n0 Jun 7, 2024
a94bd47
Merge remote-tracking branch 'upstream/main'
bichitra95 Jun 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[CLOUD-7400] Improve memory usage (sodadata#2081)
* Improve memory usage

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Fix broken change

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Add pre-commit

* Format correctly, add missing log methods

* Fix tests

* Use correct Query class

* Fix pyspark version

* dev requirements conflict

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
dirkgroenen and pre-commit-ci[bot] authored May 29, 2024
commit 5776b5e0683c64b1d3804ae141565a1c2c31a449
1 change: 1 addition & 0 deletions dev-requirements.in
Original file line number Diff line number Diff line change
@@ -15,5 +15,6 @@ readme-renderer~=32.0
certifi>=2022.12.07
wheel>=0.38.1
docutils<0.21 # 0.21 dropped py38 support, remove this after py38 support is gone
pre-commit
requests==2.31.0 # 2.32.0 is broken, does not support docker. Remove this after new version is out

17 changes: 16 additions & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -4,6 +4,9 @@
#
# pip-compile dev-requirements.in
#
--extra-index-url https://pypi.ngc.nvidia.com
--trusted-host pypi.ngc.nvidia.com

black==22.6.0
# via -r dev-requirements.in
bleach==6.1.0
@@ -14,6 +17,8 @@ certifi==2024.2.2
# via
# -r dev-requirements.in
# requests
cfgv==3.4.0
# via pre-commit
charset-normalizer==3.3.2
# via requests
cli-ui==0.17.2
@@ -44,12 +49,16 @@ filelock==3.14.0
# via
# tox
# virtualenv
identify==2.5.36
# via pre-commit
idna==3.7
# via requests
iniconfig==2.0.0
# via pytest
mypy-extensions==1.0.0
# via black
nodeenv==1.8.0
# via pre-commit
packaging==24.0
# via
# build
@@ -69,6 +78,8 @@ pluggy==1.5.0
# via
# pytest
# tox
pre-commit==3.7.1
# via -r dev-requirements.in
py==1.11.0
# via
# pytest-html
@@ -97,6 +108,8 @@ python-dateutil==2.9.0.post0
# via faker
python-dotenv==1.0.1
# via -r dev-requirements.in
pyyaml==6.0.1
# via pre-commit
readme-renderer==32.0
# via -r dev-requirements.in
requests==2.31.0
@@ -140,7 +153,9 @@ urllib3==1.26.18
# docker
# requests
virtualenv==20.26.2
# via tox
# via
# pre-commit
# tox
webencodings==0.5.1
# via bleach
websocket-client==1.8.0
16 changes: 14 additions & 2 deletions soda/core/soda/common/logs.py
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@ def configure_logging():
logging.getLogger("pyspark").setLevel(logging.ERROR)
logging.getLogger("pyhive").setLevel(logging.ERROR)
logging.getLogger("py4j").setLevel(logging.INFO)
logging.getLogger("segment").setLevel(logging.WARNING)
logging.basicConfig(
level=logging.DEBUG,
force=True, # Override any previously set handlers.
@@ -30,12 +31,23 @@ def configure_logging():


class Logs:
def __init__(self, logger: Logger):
self.logger: Logger = logger
__instance = None

def __new__(cls, logger: Logger = None):
if cls.__instance is None:
cls.__instance = super().__new__(cls)
cls.__instance._initialize()
return cls.__instance

def _initialize(self):
self.logs: list[Log] = []
self.logs_buffer: list[Log] = []
self.verbose: bool = False

def reset(self):
self.__instance = Logs()
self.__instance._initialize()

def error(
self,
message: str,
48 changes: 48 additions & 0 deletions soda/core/soda/common/memory_safe_cursor_fetcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from typing import List, Tuple

from soda.common.logs import Logs

BATCH_SIZE = 100


class MemorySafeCursorFetcher:
def __init__(self, cursor, limit=10000):
self._cursor = cursor
self._logs = Logs()
self.limit = limit
self.rows = None
self.limit_exhausted = False
self.total_row_count = -1

def get_row_count(self) -> int:
self.get_rows()
return self.total_row_count

def get_rows(self) -> List[Tuple]:
if self.rows is not None:
return self.rows

self.rows = []
self.total_row_count = 0
while True:
results = self._cursor.fetchmany(BATCH_SIZE)
# Make sure to empty th entire [remote] cursor, even if results are
# no longer needed.
if not results or len(results) == 0:
break

# Count all rows, regardless of storing
self.total_row_count += len(results)

# Only store the needed number of results in memory
if len(self.rows) < self.limit:
self.rows.extend(results[: self.limit - len(self.rows)])
elif self.limit_exhausted is False:
self._logs.warning(
"The query produced a lot of results, which have not all been stored in memory. "
f"Soda limits the number of processed results for sampling-like use-cases to {self.limit}. "
"You might want to consider optimising your query to select less results."
)
self.limit_exhausted = True

return self.rows
3 changes: 2 additions & 1 deletion soda/core/soda/execution/data_source.py
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@
from soda.common.string_helper import string_matches_simple_pattern
from soda.execution.data_type import DataType
from soda.execution.query.query import Query
from soda.execution.query.query_without_results import QueryWithoutResults
from soda.execution.query.schema_query import TableColumnsQuery
from soda.sampler.sample_ref import SampleRef
from soda.sodacl.location import Location
@@ -1076,7 +1077,7 @@ def _optionally_quote_table_name_from_meta_data(self, table_name: str) -> str:

def analyze_table(self, table: str):
if self.sql_analyze_table(table):
Query(
QueryWithoutResults(
data_source_scan=self.data_source_scan,
unqualified_query_name=f"analyze_{table}",
sql=self.sql_analyze_table(table),
168 changes: 78 additions & 90 deletions soda/core/soda/execution/query/query.py
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@
from datetime import datetime, timedelta

from soda.common.exception_helper import get_exception_stacktrace
from soda.common.memory_safe_cursor_fetcher import MemorySafeCursorFetcher
from soda.common.query_helper import parse_columns_from_query
from soda.common.string_helper import strip_quotes
from soda.common.undefined_instance import undefined
@@ -50,6 +51,7 @@ def __init__(
self.description: tuple | None = None
self.row: tuple | None = None
self.rows: list[tuple] | None = None
self.row_count: int | None = None
self.sample_ref: SampleRef | None = None
self.exception: BaseException | None = None
self.duration: timedelta | None = None
@@ -104,14 +106,14 @@ def execute(self):
Execute method implementations should
- invoke either self.fetchone, self.fetchall or self.store
- update the metrics with value and optionally other diagnostic information
If queries are not intended to return any data, use the QueryWithoutResults class.
"""
# TODO: some of the subclasses couple setting metric with storing the sample - refactor that.
self.fetchall()

def fetchone(self):
def _execute_cursor(self, execute=True):
"""
DataSource query execution exceptions will be caught and result in the
self.exception being populated.
Execute the SQL query and yield the cursor for further processing.
"""
self.__append_to_scan()
start = datetime.now()
@@ -120,10 +122,16 @@ def fetchone(self):
cursor = data_source.connection.cursor()
try:
self.logs.debug(f"Query {self.query_name}:\n{self.sql}")
cursor.execute(self.sql)
self.row = cursor.fetchone()
if execute:
cursor.execute(self.sql)
self.description = cursor.description
yield cursor
finally:
# Some DB implementations, like MYSQL, require the cursor's results to be
# read before closing. This is not always the case so we want to make sure
# results are reset when possible.
if hasattr(cursor, "reset"):
cursor.reset()
cursor.close()
except BaseException as e:
self.exception = e
@@ -136,103 +144,83 @@ def fetchone(self):
finally:
self.duration = datetime.now() - start

def fetchone(self):
"""
DataSource query execution exceptions will be caught and result in the
self.exception being populated.
"""
for cursor in self._execute_cursor():
self.row = cursor.fetchone()
self.row_count = 1 if self.row is not None else 0

def fetchall(self):
"""
DataSource query execution exceptions will be caught and result in the
self.exception being populated.
"""
self.__append_to_scan()
start = datetime.now()
data_source = self.data_source_scan.data_source
try:
cursor = data_source.connection.cursor()
try:
self.logs.debug(f"Query {self.query_name}:\n{self.sql}")
cursor.execute(self.sql)
self.rows = cursor.fetchall()
self.description = cursor.description
finally:
cursor.close()
except BaseException as e:
self.exception = e
self.logs.error(f"Query error: {self.query_name}: {e}\n{self.sql}", exception=e, location=self.location)
data_source.query_failed(e)
finally:
self.duration = datetime.now() - start
for cursor in self._execute_cursor():
safe_fetcher = MemorySafeCursorFetcher(cursor)
self.rows = safe_fetcher.get_rows()
self.row_count = safe_fetcher.get_row_count()

def store(self):
"""
DataSource query execution exceptions will be caught and result in the
self.exception being populated.
"""
self.__append_to_scan()
sampler: Sampler = self.data_source_scan.scan._configuration.sampler
start = datetime.now()
data_source = self.data_source_scan.data_source
try:
cursor = data_source.connection.cursor()
try:
# Check if query does not contain forbidden columns and only create sample if it does not.
# Query still needs to execute in case this is a query that also sets a metric value. (e.g. reference check)
allow_samples = True
offending_columns = []

if self.partition and self.partition.table:
query_columns = parse_columns_from_query(self.sql)

for column in query_columns:
if self.data_source_scan.data_source.is_column_excluded(
self.partition.table.table_name, column
):
allow_samples = False
offending_columns.append(column)

# A bit of a hacky workaround for queries that also set the metric in one go.
# TODO: revisit after decoupling getting metric values and storing samples. This can be dangerous, it sets the metric value
# only when metric value is not set, but this could cause weird regressions.
set_metric = False
if hasattr(self, "metric") and self.metric and self.metric.value == undefined:
set_metric = True

if set_metric or allow_samples:
self.logs.debug(f"Query {self.query_name}:\n{self.sql}")
cursor.execute(str(self.sql))
self.description = cursor.description
db_sample = DbSample(cursor, self.data_source_scan.data_source)

if set_metric:
self.metric.set_value(len(db_sample.get_rows()))

if allow_samples:
# TODO Hacky way to get the check name, check name isn't there when dataset samples are taken
check_name = next(iter(self.metric.checks)).name if hasattr(self, "metric") else None
sample_context = SampleContext(
sample=db_sample,
sample_name=self.sample_name,
query=self.sql,
data_source=self.data_source_scan.data_source,
partition=self.partition,
column=self.column,
scan=self.data_source_scan.scan,
logs=self.data_source_scan.scan._logs,
samples_limit=self.samples_limit,
passing_sql=self.passing_sql,
check_name=check_name,
)

self.sample_ref = sampler.store_sample(sample_context)
else:
self.logs.info(
f"Skipping samples from query '{self.query_name}'. Excluded column(s) present: {offending_columns}."
)
finally:
cursor.close()
except BaseException as e:
self.exception = e
self.logs.error(f"Query error: {self.query_name}: {e}\n{self.sql}", exception=e, location=self.location)
data_source.query_failed(e)
finally:
self.duration = datetime.now() - start
for cursor in self._execute_cursor(False):
# Check if query does not contain forbidden columns and only create sample if it does not.
# Query still needs to execute in case this is a query that also sets a metric value. (e.g. reference check)
allow_samples = True
offending_columns = []

if self.partition and self.partition.table:
query_columns = parse_columns_from_query(self.sql)

for column in query_columns:
if self.data_source_scan.data_source.is_column_excluded(self.partition.table.table_name, column):
allow_samples = False
offending_columns.append(column)

# A bit of a hacky workaround for queries that also set the metric in one go.
# TODO: revisit after decoupling getting metric values and storing samples. This can be dangerous, it sets the metric value
# only when metric value is not set, but this could cause weird regressions.
set_metric = False
if hasattr(self, "metric") and self.metric and self.metric.value == undefined:
set_metric = True

if set_metric or allow_samples:
self.logs.debug(f"Query {self.query_name}:\n{self.sql}")
cursor.execute(str(self.sql))
self.description = cursor.description
db_sample = DbSample(cursor, self.data_source_scan.data_source, self.samples_limit)

if set_metric:
self.metric.set_value(db_sample.get_rows_count())

if allow_samples:
# TODO Hacky way to get the check name, check name isn't there when dataset samples are taken
check_name = next(iter(self.metric.checks)).name if hasattr(self, "metric") else None
sample_context = SampleContext(
sample=db_sample,
sample_name=self.sample_name,
query=self.sql,
data_source=self.data_source_scan.data_source,
partition=self.partition,
column=self.column,
scan=self.data_source_scan.scan,
logs=self.data_source_scan.scan._logs,
samples_limit=self.samples_limit,
passing_sql=self.passing_sql,
check_name=check_name,
)

self.sample_ref = sampler.store_sample(sample_context)
else:
self.logs.info(
f"Skipping samples from query '{self.query_name}'. Excluded column(s) present: {offending_columns}."
)

def __append_to_scan(self):
scan = self.data_source_scan.scan
Loading