Skip to content

Commit e4ec61f

Browse files
committed
Dryrun caching
1 parent 084db72 commit e4ec61f

File tree

6 files changed

+332
-11
lines changed

6 files changed

+332
-11
lines changed

bigquery_etl/cli/query.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2300,6 +2300,9 @@ def _update_query_schema(
23002300
query_schema = Schema.from_query_file(
23012301
query_file_path,
23022302
content=sql_content,
2303+
project=project_name,
2304+
dataset=dataset_name,
2305+
table=table_name,
23032306
use_cloud_function=use_cloud_function,
23042307
respect_skip=respect_dryrun_skip,
23052308
sql_dir=sql_dir,

bigquery_etl/dryrun.py

Lines changed: 133 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,14 @@
1212
"""
1313

1414
import glob
15+
import hashlib
1516
import json
17+
import os
18+
import pickle
1619
import random
1720
import re
1821
import sys
22+
import tempfile
1923
import time
2024
from enum import Enum
2125
from os.path import basename, dirname, exists
@@ -106,12 +110,12 @@ def __init__(
106110
dataset=None,
107111
table=None,
108112
billing_project=None,
109-
ignore_content=False,
113+
use_cache=True,
110114
):
111115
"""Instantiate DryRun class."""
112116
self.sqlfile = sqlfile
113117
self.content = content
114-
self.ignore_content = ignore_content
118+
self.use_cache = use_cache
115119
self.query_parameters = query_parameters
116120
self.strip_dml = strip_dml
117121
self.use_cloud_function = use_cloud_function
@@ -227,16 +231,125 @@ def get_sql(self):
227231

228232
return sql
229233

234+
def _get_cache_key(self, sql):
235+
"""Generate cache key based on SQL content and other parameters."""
236+
cache_input = f"{sql}|{self.project}|{self.dataset}|{self.table}"
237+
return hashlib.sha256(cache_input.encode()).hexdigest()
238+
239+
def _get_cached_result(self, cache_key, ttl_seconds=None):
240+
"""Load cached dry run result from disk."""
241+
if ttl_seconds is None:
242+
ttl_seconds = ConfigLoader.get("dry_run", "cache_ttl_seconds", fallback=900)
243+
244+
cache_dir = os.path.join(tempfile.gettempdir(), "bigquery_etl_dryrun_cache")
245+
os.makedirs(cache_dir, exist_ok=True)
246+
cache_file = os.path.join(cache_dir, f"dryrun_{cache_key}.pkl")
247+
248+
if os.path.exists(cache_file):
249+
# check if cache is expired
250+
file_age = time.time() - os.path.getmtime(cache_file)
251+
if file_age > ttl_seconds:
252+
try:
253+
os.remove(cache_file)
254+
except OSError:
255+
pass
256+
return None
257+
258+
try:
259+
with open(cache_file, "rb") as f:
260+
cached_data = pickle.load(f)
261+
cache_age = time.time() - os.path.getmtime(cache_file)
262+
print(f"[DRYRUN CACHE HIT] {self.sqlfile} (age: {cache_age:.0f}s)")
263+
return cached_data
264+
except (pickle.PickleError, EOFError, OSError) as e:
265+
print(f"[DRYRUN CACHE] Failed to load cache: {e}")
266+
return None
267+
268+
return None
269+
270+
def _save_cached_result(self, cache_key, result):
271+
"""Save dry run result to disk cache."""
272+
cache_dir = os.path.join(tempfile.gettempdir(), "bigquery_etl_dryrun_cache")
273+
os.makedirs(cache_dir, exist_ok=True)
274+
cache_file = os.path.join(cache_dir, f"dryrun_{cache_key}.pkl")
275+
276+
try:
277+
with open(cache_file, "wb") as f:
278+
pickle.dump(result, f)
279+
280+
# save table metadata separately if present
281+
if (
282+
result
283+
and "tableMetadata" in result
284+
and self.project
285+
and self.dataset
286+
and self.table
287+
):
288+
table_identifier = f"{self.project}.{self.dataset}.{self.table}"
289+
self._save_cached_table_metadata(
290+
table_identifier, result["tableMetadata"]
291+
)
292+
except (pickle.PickleError, OSError) as e:
293+
print(f"[DRYRUN CACHE] Failed to save cache: {e}")
294+
295+
def _get_cached_table_metadata(self, table_identifier, ttl_seconds=None):
296+
"""Load cached table metadata from disk based on table identifier."""
297+
if ttl_seconds is None:
298+
ttl_seconds = ConfigLoader.get("dry_run", "cache_ttl_seconds", fallback=900)
299+
300+
cache_dir = os.path.join(tempfile.gettempdir(), "bigquery_etl_dryrun_cache")
301+
os.makedirs(cache_dir, exist_ok=True)
302+
# table identifier as cache key
303+
table_cache_key = hashlib.sha256(table_identifier.encode()).hexdigest()
304+
cache_file = os.path.join(cache_dir, f"table_metadata_{table_cache_key}.pkl")
305+
306+
if os.path.exists(cache_file):
307+
# check if cache is expired
308+
file_age = time.time() - os.path.getmtime(cache_file)
309+
310+
if file_age > ttl_seconds:
311+
try:
312+
os.remove(cache_file)
313+
except OSError:
314+
pass
315+
return None
316+
317+
try:
318+
with open(cache_file, "rb") as f:
319+
cached_data = pickle.load(f)
320+
return cached_data
321+
except (pickle.PickleError, EOFError, OSError) as e:
322+
return None
323+
return None
324+
325+
def _save_cached_table_metadata(self, table_identifier, metadata):
326+
"""Save table metadata to disk cache."""
327+
cache_dir = os.path.join(tempfile.gettempdir(), "bigquery_etl_dryrun_cache")
328+
os.makedirs(cache_dir, exist_ok=True)
329+
table_cache_key = hashlib.sha256(table_identifier.encode()).hexdigest()
330+
cache_file = os.path.join(cache_dir, f"table_metadata_{table_cache_key}.pkl")
331+
332+
try:
333+
with open(cache_file, "wb") as f:
334+
pickle.dump(metadata, f)
335+
except (pickle.PickleError, OSError) as e:
336+
print(f"[TABLE METADATA] Failed to save cache for {table_identifier}: {e}")
337+
230338
@cached_property
231339
def dry_run_result(self):
232340
"""Dry run the provided SQL file."""
233-
if self.ignore_content:
234-
sql = None
341+
if self.content:
342+
sql = self.content
235343
else:
236-
if self.content:
237-
sql = self.content
238-
elif self.content != "":
239-
sql = self.get_sql()
344+
sql = self.get_sql()
345+
346+
# Check cache first (if caching is enabled)
347+
if sql is not None and self.use_cache:
348+
cache_key = self._get_cache_key(sql)
349+
cached_result = self._get_cached_result(cache_key)
350+
if cached_result is not None:
351+
self.dry_run_duration = 0 # Cached result, no actual dry run
352+
return cached_result
240353

241354
query_parameters = []
242355
if self.query_parameters:
@@ -356,6 +469,11 @@ def dry_run_result(self):
356469
}
357470

358471
self.dry_run_duration = time.time() - start_time
472+
473+
# Save to cache (if caching is enabled)
474+
if self.use_cache:
475+
self._save_cached_result(cache_key, result)
476+
359477
return result
360478

361479
except Exception as e:
@@ -481,6 +599,13 @@ def get_table_schema(self):
481599
):
482600
return self.dry_run_result["tableMetadata"]["schema"]
483601

602+
# Check if table metadata is cached (if caching is enabled)
603+
if self.use_cache and self.project and self.dataset and self.table:
604+
table_identifier = f"{self.project}.{self.dataset}.{self.table}"
605+
cached_metadata = self._get_cached_table_metadata(table_identifier)
606+
if cached_metadata:
607+
return cached_metadata["schema"]
608+
484609
return []
485610

486611
def get_dataset_labels(self):

bigquery_etl/schema/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,6 @@ def for_table(
7878
project=project,
7979
dataset=dataset,
8080
table=table,
81-
respect_skip=False,
82-
ignore_content=True,
8381
*args,
8482
**kwargs,
8583
).get_table_schema()

bigquery_etl/schema/stable_table_schema.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import json
44
import os
55
import pickle
6+
import shutil
67
import tarfile
78
import tempfile
89
import urllib.request
@@ -51,6 +52,17 @@ def sortkey(self):
5152
)
5253

5354

55+
def _clear_dryrun_cache():
56+
"""Clear dry run cache when new schemas are downloaded."""
57+
cache_dir = os.path.join(tempfile.gettempdir(), "bigquery_etl_dryrun_cache")
58+
if os.path.exists(cache_dir):
59+
try:
60+
shutil.rmtree(cache_dir)
61+
print(f"Cleared dry run cache at {cache_dir}")
62+
except OSError as e:
63+
print(f"Warning: Failed to clear dry run cache: {e}")
64+
65+
5466
def prod_schemas_uri():
5567
"""Return URI for the schemas tarball deployed to shared-prod.
5668
@@ -59,7 +71,7 @@ def prod_schemas_uri():
5971
with the most recent production schemas deploy.
6072
"""
6173
dryrun = DryRun(
62-
"moz-fx-data-shared-prod/telemetry_derived/foo/query.sql", content="SELECT 1"
74+
"moz-fx-data-shared-prod/telemetry_derived/foo/query.sql", content="SELECT 1", use_cache=False
6375
)
6476
build_id = dryrun.get_dataset_labels()["schemas_build_id"]
6577
commit_hash = build_id.split("_")[-1]
@@ -88,6 +100,11 @@ def get_stable_table_schemas() -> List[SchemaFile]:
88100
print(f"Failed to load cached schemas: {e}, re-downloading...")
89101

90102
print(f"Downloading schemas from {schemas_uri}")
103+
104+
# Clear dry run cache when downloading new schemas
105+
# Schema changes could affect dry run results
106+
_clear_dryrun_cache()
107+
91108
with urllib.request.urlopen(schemas_uri) as f:
92109
tarbytes = BytesIO(f.read())
93110

bqetl_project.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ dry_run:
3232
function_accounts:
3333
- bigquery-etl-dryrun@moz-fx-data-shared-prod.iam.gserviceaccount.com
3434
- bigquery-etl-dryrun@moz-fx-data-shar-nonprod-efed.iam.gserviceaccount.com
35+
cache_ttl_seconds: 900 # Cache dry run results for 15 minutes (900 seconds)
3536
skip:
3637
## skip all data-observability-dev queries due to CI lacking permissions in that project.
3738
# TODO: once data observability platform assessment concludes this should be removed.

0 commit comments

Comments
 (0)