Skip to content

Commit 7e63535

Browse files
scholtzanBenWu
andauthored
Replace dry run check in glean_usage generator to check referenced tables exist (#8083)
* Remove dry run check in glean_usage generator to check referenced tables exist * Cache baseline tables * Cache stable schemas in local directory * Cache app info in events monitoring generator * run loop instead of threading pool * use threadpool * check outdir for existing schemas * Derive metrics_clients_last_seen from metrics_clients_daily * Skip generating metrics_clients_last_seen and _daily for certain apps * Bug 1988207 - Look for schema in output_dir in glean_app_ping_views * debug * add default sql_dir * remove debug * Rename baseline_tables to base_tables in glean_usage generation * Update path-filtering orb --------- Co-authored-by: bwu <[email protected]>
1 parent cb0136b commit 7e63535

16 files changed

+182
-46
lines changed

.circleci/config.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ version: 2.1
33
setup: true
44

55
orbs:
6-
path-filtering: circleci/path-filtering@0.1.3
6+
path-filtering: circleci/path-filtering@2.0.4
77
continuation: circleci/[email protected]
88

99
parameters:

bigquery_etl/schema/stable_table_schema.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
"""Methods for working with stable table schemas."""
22

33
import json
4+
import os
5+
import pickle
46
import tarfile
7+
import tempfile
58
import urllib.request
69
from dataclasses import dataclass
710
from functools import cache
@@ -68,6 +71,23 @@ def prod_schemas_uri():
6871
def get_stable_table_schemas() -> List[SchemaFile]:
6972
"""Fetch last schema metadata per doctype by version."""
7073
schemas_uri = prod_schemas_uri()
74+
75+
# create cache file path based on the schemas URI
76+
commit_hash = schemas_uri.split("/")[-1].replace(".tar.gz", "")
77+
cache_dir = os.path.join(tempfile.gettempdir(), "bigquery_etl_schemas")
78+
os.makedirs(cache_dir, exist_ok=True)
79+
cache_file = os.path.join(cache_dir, f"schemas_{commit_hash}.pkl")
80+
81+
# check if cached file exists and load it
82+
if os.path.exists(cache_file):
83+
print(f"Loading cached schemas from {cache_file}")
84+
try:
85+
with open(cache_file, "rb") as f:
86+
return pickle.load(f)
87+
except (pickle.PickleError, EOFError, OSError) as e:
88+
print(f"Failed to load cached schemas: {e}, re-downloading...")
89+
90+
print(f"Downloading schemas from {schemas_uri}")
7191
with urllib.request.urlopen(schemas_uri) as f:
7292
tarbytes = BytesIO(f.read())
7393

@@ -132,4 +152,12 @@ def get_stable_table_schemas() -> List[SchemaFile]:
132152
)
133153
]
134154

155+
# Cache the processed schemas
156+
try:
157+
with open(cache_file, "wb") as f:
158+
pickle.dump(schemas, f)
159+
print(f"Cached schemas to {cache_file}")
160+
except (pickle.PickleError, OSError) as e:
161+
print(f"Failed to cache schemas: {e}")
162+
135163
return schemas

bqetl_project.yaml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,32 @@ generate:
410410
- focus_android
411411
- fenix
412412
- firefox_desktop
413+
metrics_clients_last_seen:
414+
skip_apps:
415+
- accounts_cirrus
416+
- experimenter_cirrus
417+
- firefox_desktop_background_defaultagent
418+
- firefox_desktop_background_tasks
419+
- firefox_crashreporter
420+
- monitor_cirrus
421+
- mozilla_vpn
422+
- mozillavpn_backend_cirrus
423+
- pine
424+
- thunderbird_android
425+
- thunderbird_desktop
426+
clients_last_seen_joined:
427+
skip_apps:
428+
- accounts_cirrus
429+
- experimenter_cirrus
430+
- firefox_desktop_background_defaultagent
431+
- firefox_desktop_background_tasks
432+
- firefox_crashreporter
433+
- monitor_cirrus
434+
- mozilla_vpn
435+
- mozillavpn_backend_cirrus
436+
- pine
437+
- thunderbird_android
438+
- thunderbird_desktop
413439
bigconfig:
414440
skip_apps:
415441
- firefox_echo_show

sql_generators/glean_usage/__init__.py

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ def generate(
102102

103103
@cache
104104
def get_tables(table_name="baseline_v1"):
105-
baseline_tables = list_tables(
105+
tables = list_tables(
106106
project_id=target_project,
107107
only_tables=[only] if only else None,
108108
table_filter=table_filter,
@@ -111,9 +111,9 @@ def get_tables(table_name="baseline_v1"):
111111

112112
# filter out skipped apps
113113
return [
114-
baseline_table
115-
for baseline_table in baseline_tables
116-
if baseline_table.split(".")[1]
114+
table
115+
for table in tables
116+
if table.split(".")[1]
117117
not in [
118118
f"{skipped_app}_stable"
119119
for skipped_app in ConfigLoader.get(
@@ -153,12 +153,31 @@ def get_tables(table_name="baseline_v1"):
153153
parallelism=parallelism,
154154
id_token=id_token,
155155
),
156-
baseline_table,
156+
base_table,
157157
)
158158
for table in GLEAN_TABLES
159-
for baseline_table in get_tables(table_name=table.base_table_name)
159+
for base_table in get_tables(table_name=table.base_table_name)
160160
]
161161

162+
base_tables = {}
163+
unique_base_table_names = {table.base_table_name for table in GLEAN_TABLES}
164+
for table_name in unique_base_table_names:
165+
base_tables[table_name] = get_tables(table_name=table_name)
166+
167+
def all_base_tables_exist(app_info, table_name="baseline_v1"):
168+
"""Check if baseline tables exist for all app datasets."""
169+
# Extract dataset names from table names (format: project.dataset.table)
170+
existing_datasets = {table.split(".")[1] for table in base_tables[table_name]}
171+
172+
# Check if all app datasets have corresponding tables
173+
if isinstance(app_info, dict):
174+
required_datasets = {f"{app_info['bq_dataset_family']}_stable"}
175+
else:
176+
required_datasets = {f"{app['bq_dataset_family']}_stable" for app in app_info}
177+
178+
return all(dataset in existing_datasets for dataset in required_datasets)
179+
180+
162181
# Parameters to generate per-app datasets consist of the function to be called
163182
# and app_info
164183
generate_per_app = [
@@ -170,6 +189,9 @@ def get_tables(table_name="baseline_v1"):
170189
use_cloud_function=use_cloud_function,
171190
parallelism=parallelism,
172191
id_token=id_token,
192+
all_base_tables_exist=all_base_tables_exist(info, table_name=table.base_table_name)
193+
if hasattr(table, 'per_app_requires_all_base_tables') and table.per_app_requires_all_base_tables
194+
else None
173195
),
174196
info,
175197
)

sql_generators/glean_usage/baseline_clients_daily.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@ def __init__(self):
1515
self.target_table_id = BASELINE_DAILY_TABLE_ID
1616
self.prefix = PREFIX
1717
self.custom_render_kwargs = {}
18+
self.per_app_requires_all_base_tables = True

sql_generators/glean_usage/baseline_clients_first_seen.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ def __init__(self):
1515
self.target_table_id = TARGET_TABLE_ID
1616
self.prefix = PREFIX
1717
self.custom_render_kwargs = {}
18+
self.per_app_requires_all_base_tables = True
1819

1920
def generate_per_app_id(
2021
self,

sql_generators/glean_usage/baseline_clients_last_seen.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,4 @@ def __init__(self):
2525
self.custom_render_kwargs = dict(
2626
usage_types=USAGE_TYPES,
2727
)
28+
self.per_app_requires_all_base_tables = True

sql_generators/glean_usage/clients_last_seen_joined.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
from sql_generators.glean_usage.common import GleanTable
44

5+
from bigquery_etl.config import ConfigLoader
6+
57
TARGET_TABLE_ID = "clients_last_seen_joined_v1"
68
PREFIX = "clients_last_seen_joined"
79

@@ -15,3 +17,33 @@ def __init__(self):
1517
self.target_table_id = TARGET_TABLE_ID
1618
self.per_app_id_enabled = False
1719
self.cross_channel_template = None
20+
self.per_app_requires_all_base_tables = True
21+
22+
def generate_per_app(
23+
self,
24+
project_id,
25+
app_info,
26+
output_dir=None,
27+
use_cloud_function=True,
28+
parallelism=8,
29+
id_token=None,
30+
all_base_tables_exist=None,
31+
):
32+
"""Generate per-app datasets."""
33+
skip_apps = ConfigLoader.get(
34+
"generate", "glean_usage", "clients_last_seen_joined", "skip_apps", fallback=[]
35+
)
36+
if app_info[0]["app_name"] in skip_apps:
37+
print(
38+
f"Skipping clients_last_seen_joined generation for {app_info[0]['app_name']}"
39+
)
40+
return
41+
return super().generate_per_app(
42+
project_id,
43+
app_info,
44+
output_dir,
45+
use_cloud_function,
46+
parallelism,
47+
id_token,
48+
all_base_tables_exist,
49+
)

sql_generators/glean_usage/common.py

Lines changed: 10 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -149,24 +149,6 @@ def table_names_from_baseline(baseline_table, include_project_id=True):
149149
)
150150

151151

152-
def referenced_table_exists(view_sql, id_token=None):
153-
"""Dry run the given view SQL to see if its referent exists."""
154-
dryrun = DryRun("foo/bar/view.sql", content=view_sql, id_token=id_token)
155-
# 403 is returned if referenced dataset doesn't exist; we need to check that the 403 is due to dataset not existing
156-
# since dryruns on views will also return 403 due to the table CREATE
157-
# 404 is returned if referenced table or view doesn't exist
158-
return not any(
159-
[
160-
404 == e.get("code")
161-
or (
162-
403 == e.get("code")
163-
and "bigquery.tables.create denied" not in e.get("message")
164-
)
165-
for e in dryrun.errors()
166-
]
167-
)
168-
169-
170152
def _contains_glob(patterns):
171153
return any({"*", "?", "["}.intersection(pattern) for pattern in patterns)
172154

@@ -204,6 +186,7 @@ def __init__(self):
204186
self.custom_render_kwargs = {}
205187
self.per_app_id_enabled = True
206188
self.per_app_enabled = True
189+
self.per_app_requires_all_base_tables = False
207190
self.across_apps_enabled = True
208191
self.cross_channel_template = "cross_channel.view.sql"
209192
self.base_table_name = "baseline_v1"
@@ -361,10 +344,7 @@ def generate_per_app_id(
361344
if query_python:
362345
artifacts.append(Artifact(table, "query.py", query_python))
363346

364-
if not (referenced_table_exists(view_sql, id_token)):
365-
logging.info("Skipping view for table which doesn't exist:" f" {table}")
366-
else:
367-
artifacts.append(Artifact(view, "view.sql", view_sql))
347+
artifacts.append(Artifact(view, "view.sql", view_sql))
368348

369349
skip_existing_artifact = self.skip_existing(output_dir, project_id)
370350

@@ -409,15 +389,22 @@ def generate_per_app(
409389
use_cloud_function=True,
410390
parallelism=8,
411391
id_token=None,
392+
all_base_tables_exist=None,
412393
):
413394
"""Generate the baseline table query per app_name."""
414395
if not self.per_app_enabled:
415396
return
416-
397+
417398
app_name = app_info[0]["app_name"]
418399

419400
target_view_name = "_".join(self.target_table_id.split("_")[:-1])
420401
target_dataset = app_name
402+
403+
if self.per_app_requires_all_base_tables and not all_base_tables_exist:
404+
logging.info(
405+
f"Skipping per-app generation for {target_dataset}.{target_view_name} as not all baseline tables exist"
406+
)
407+
return
421408

422409
datasets = [
423410
(a["bq_dataset_family"], a.get("app_channel", "release")) for a in app_info
@@ -465,10 +452,6 @@ def generate_per_app(
465452
)
466453
view = f"{project_id}.{target_dataset}.{target_view_name}"
467454

468-
if not (referenced_table_exists(sql, id_token=id_token)):
469-
logging.info("Skipping view for table which doesn't exist:" f" {view}")
470-
return
471-
472455
if output_dir:
473456
write_dataset_metadata(output_dir, view)
474457

@@ -500,13 +483,6 @@ def generate_per_app(
500483
table = f"{project_id}.{target_dataset}_derived.{self.target_table_id}"
501484
view = f"{project_id}.{target_dataset}.{target_view_name}"
502485

503-
if not (referenced_table_exists(query_sql, id_token=id_token)):
504-
logging.info(
505-
"Skipping query for table which doesn't exist:"
506-
f" {self.target_table_id}"
507-
)
508-
return
509-
510486
if output_dir:
511487
artifacts = [
512488
Artifact(table, "query.sql", query_sql),

sql_generators/glean_usage/event_monitoring_live.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@ def generate_per_app_id(
9494
parallelism=8,
9595
id_token=None,
9696
):
97+
# Cache app_info to avoid repeated calls
98+
cached_app_info = get_app_info()
99+
97100
# Get the app ID from the baseline_table name.
98101
# This is what `common.py` also does.
99102
app_id = re.sub(r"_stable\..+", "", baseline_table)
@@ -120,7 +123,7 @@ def generate_per_app_id(
120123

121124
app_name = [
122125
app_dataset["app_name"]
123-
for _, app in get_app_info().items()
126+
for _, app in cached_app_info.items()
124127
for app_dataset in app
125128
if dataset == app_dataset["bq_dataset_family"]
126129
][0]
@@ -130,7 +133,7 @@ def generate_per_app_id(
130133
else:
131134
v1_name = [
132135
app_dataset["v1_name"]
133-
for _, app in get_app_info().items()
136+
for _, app in cached_app_info.items()
134137
for app_dataset in app
135138
if dataset == app_dataset["bq_dataset_family"]
136139
][0]
@@ -166,7 +169,7 @@ def generate_per_app_id(
166169
current_date=datetime.today().strftime("%Y-%m-%d"),
167170
app_name=[
168171
app_dataset["canonical_app_name"]
169-
for _, app in get_app_info().items()
172+
for _, app in cached_app_info.items()
170173
for app_dataset in app
171174
if dataset == app_dataset["bq_dataset_family"]
172175
][0],
@@ -227,6 +230,9 @@ def generate_across_apps(
227230
if not self.across_apps_enabled:
228231
return
229232

233+
# Cache app_info to avoid repeated calls
234+
cached_app_info = get_app_info()
235+
230236
aggregate_table = "event_monitoring_aggregates_v1"
231237
target_view_name = "_".join(self.target_table_id.split("_")[:-1])
232238

@@ -249,7 +255,7 @@ def generate_across_apps(
249255
dataset = app_dataset["bq_dataset_family"]
250256
app_name = [
251257
app_dataset["app_name"]
252-
for _, app in get_app_info().items()
258+
for _, app in cached_app_info.items()
253259
for app_dataset in app
254260
if dataset == app_dataset["bq_dataset_family"]
255261
][0]
@@ -261,7 +267,7 @@ def generate_across_apps(
261267
else:
262268
v1_name = [
263269
app_dataset["v1_name"]
264-
for _, app in get_app_info().items()
270+
for _, app in cached_app_info.items()
265271
for app_dataset in app
266272
if dataset == app_dataset["bq_dataset_family"]
267273
][0]

0 commit comments

Comments
 (0)