diff --git a/.circleci/workflows.yml b/.circleci/workflows.yml index 6706c429a7c..4b2f385ebd8 100644 --- a/.circleci/workflows.yml +++ b/.circleci/workflows.yml @@ -35,10 +35,16 @@ executors: machine: image: ubuntu-2404:current +# environment variables for caching +environment: &cache_environment + DRYRUN_CACHE_ENABLED: "true" + DRYRUN_CACHE_DIR: "/tmp/workspace/dryrun_cache" + jobs: build: docker: &docker - image: python:<< pipeline.parameters.python-version >> + environment: *cache_environment steps: - checkout - &restore_venv_cache @@ -166,7 +172,7 @@ jobs: - checkout - *restore_venv_cache - *build - - &attach_generated_sql + - &attach_workspace_artifacts attach_workspace: at: /tmp/workspace - ©_staged_sql @@ -181,6 +187,11 @@ jobs: name: Run SQL tests command: | PATH="venv/bin:$PATH" script/entrypoint -m sql -n 8 -p no:bigquery_etl.pytest_plugin.routine + - &persist_dryrun_cache + persist_to_workspace: + root: /tmp/workspace + paths: + - dryrun_cache - ©_debug_sql run: name: Copy generated SQL to save for debugging @@ -214,7 +225,7 @@ jobs: - checkout - *restore_venv_cache - *build - - *attach_generated_sql + - *attach_workspace_artifacts - *copy_staged_sql - *authenticate - run: @@ -242,8 +253,9 @@ jobs: PATHS="sql/bigquery-etl-integration-test" fi echo $PATHS - PATH="venv/bin:$PATH" script/bqetl dryrun --validate-schemas $PATHS + PATH="venv/bin:$PATH" script/bqetl dryrun --use-cache --validate-schemas $PATHS # yamllint enable rule:line-length + - *persist_dryrun_cache - *copy_debug_sql - *store_debug_artifacts - unless: @@ -262,7 +274,7 @@ jobs: - checkout - *restore_venv_cache - *build - - *attach_generated_sql + - *attach_workspace_artifacts - ©_generated_sql run: name: Move generated-sql into place @@ -293,7 +305,7 @@ jobs: name: Validate workgroup access configuration on main command: | PATH="venv/bin:$PATH" script/bqetl metadata validate-workgroups sql/ - - *attach_generated_sql + - *attach_workspace_artifacts - *copy_staged_sql - *authenticate - run: @@ -341,7 +353,7 @@ jobs: - checkout - *restore_venv_cache - *build - - *attach_generated_sql + - *attach_workspace_artifacts - *authenticate - &add_private_bigquery_etl_ssh_keys add_ssh_keys: @@ -391,7 +403,7 @@ jobs: name: Pull telemetry-airflow command: | git clone https://github.com/mozilla/telemetry-airflow.git ~/telemetry-airflow - - *attach_generated_sql + - *attach_workspace_artifacts - *copy_generated_sql - run: name: Replace telemetry-airflow DAGs with BigQuery ETL DAGs @@ -441,7 +453,7 @@ jobs: - checkout - *restore_venv_cache - *build - - *attach_generated_sql + - *attach_workspace_artifacts - *copy_staged_sql - run: name: Run routine tests @@ -468,7 +480,7 @@ jobs: - checkout - *restore_venv_cache - *build - - *attach_generated_sql + - *attach_workspace_artifacts - *copy_staged_sql - *authenticate - run: @@ -491,7 +503,7 @@ jobs: - checkout - *restore_venv_cache - *build - - *attach_generated_sql + - *attach_workspace_artifacts - add_ssh_keys: fingerprints: - "22:b9:3c:1b:82:ab:3f:e4:b5:79:70:d1:7b:b9:28:d2" @@ -520,6 +532,7 @@ jobs: - *restore_venv_cache - *build - *authenticate + - *attach_workspace_artifacts - run: name: Generate SQL content command: | @@ -614,6 +627,7 @@ jobs: root: /tmp/workspace paths: - generated-sql + - dryrun_cache - unless: condition: *validate-sql-or-routines steps: @@ -628,7 +642,7 @@ jobs: - checkout - *restore_venv_cache - *build - - *attach_generated_sql + - *attach_workspace_artifacts - *copy_generated_sql - add_ssh_keys: fingerprints: @@ -664,6 +678,7 @@ jobs: root: /tmp/workspace paths: - staged-generated-sql + - dryrun_cache - *copy_debug_sql - *store_debug_artifacts - unless: @@ -678,7 +693,7 @@ jobs: or: - << pipeline.parameters.deploy >> steps: - - *attach_generated_sql + - *attach_workspace_artifacts - add_ssh_keys: fingerprints: - "22:b9:3c:1b:82:ab:3f:e4:b5:79:70:d1:7b:b9:28:d2" @@ -790,7 +805,7 @@ jobs: - checkout - *restore_venv_cache - *build - - *attach_generated_sql + - *attach_workspace_artifacts - *authenticate - *add_private_bigquery_etl_ssh_keys - run: @@ -824,6 +839,7 @@ jobs: paths: - private-generated-sql - PRIVATE_BIGQUERY_ETL_SHA + - dryrun_cache - unless: condition: *validate-sql steps: @@ -834,7 +850,7 @@ jobs: - when: condition: *deploy steps: - - *attach_generated_sql + - *attach_workspace_artifacts - *add_private_bigquery_etl_ssh_keys - run: name: Push to private-generated-sql branch @@ -871,7 +887,7 @@ jobs: condition: *deploy steps: - checkout - - *attach_generated_sql + - *attach_workspace_artifacts - run: name: Move generated-sql into place command: | diff --git a/bigquery_etl/cli/dryrun.py b/bigquery_etl/cli/dryrun.py index c4f03eec4e4..d67d32c7621 100644 --- a/bigquery_etl/cli/dryrun.py +++ b/bigquery_etl/cli/dryrun.py @@ -64,6 +64,23 @@ help="GCP project to perform dry run in when --use_cloud_function=False", default=ConfigLoader.get("default", "project", fallback="moz-fx-data-shared-prod"), ) +@click.option( + "--use-cache/--no-cache", + help="Enable or disable local caching of dry run results. Default is --no-cache.", + default=ConfigLoader.get("dry_run", "cache_enabled", fallback=False), +) +@click.option( + "--cache-ttl-hours", + help="Cache time-to-live in hours. Default is 1 hour.", + type=int, + default=ConfigLoader.get("dry_run", "cache_ttl_hours", fallback=1), +) +@click.option( + "--cache-dir", + help="Directory to store cache files. Default is system temp directory.", + type=click.Path(), + default=None, +) @billing_project_option() def dryrun( paths: List[str], @@ -71,6 +88,9 @@ def dryrun( validate_schemas: bool, respect_skip: bool, project: str, + use_cache: bool, + cache_ttl_hours: int, + cache_dir: str, billing_project: str, ): """Perform a dry run.""" @@ -118,6 +138,9 @@ def dryrun( use_cloud_function, respect_skip, validate_schemas, + use_cache, + cache_ttl_hours, + cache_dir, credentials=credentials, id_token=id_token, billing_project=billing_project, @@ -141,6 +164,9 @@ def _sql_file_valid( use_cloud_function, respect_skip, validate_schemas, + use_cache, + cache_ttl_hours, + cache_dir, sqlfile, credentials, id_token, @@ -154,6 +180,9 @@ def _sql_file_valid( respect_skip=respect_skip, id_token=id_token, billing_project=billing_project, + cache_enabled=use_cache, + cache_ttl_hours=cache_ttl_hours, + cache_dir=cache_dir, ) if validate_schemas: try: diff --git a/bigquery_etl/dryrun.py b/bigquery_etl/dryrun.py index 4448f4ec365..5b7fa504d2c 100644 --- a/bigquery_etl/dryrun.py +++ b/bigquery_etl/dryrun.py @@ -12,13 +12,17 @@ """ import glob +import hashlib import json +import os +import pickle import random import re import sys +import tempfile import time from enum import Enum -from os.path import basename, dirname, exists +from os.path import basename, dirname, exists, getmtime from pathlib import Path from typing import Optional, Set from urllib.request import Request, urlopen @@ -105,6 +109,9 @@ def __init__( dataset=None, table=None, billing_project=None, + cache_enabled=ConfigLoader.get("dry_run", "cache_enabled", fallback=False), + cache_ttl_hours=ConfigLoader.get("dry_run", "cache_ttl_hours", fallback=1), + cache_dir=None, ): """Instantiate DryRun class.""" self.sqlfile = sqlfile @@ -147,6 +154,42 @@ def __init__( ) sys.exit(1) + # set cache directory - prioritize: explicit parameter > env var > temp directory + if cache_dir: + self.cache_dir = cache_dir + elif os.environ.get("DRYRUN_CACHE_DIR"): + self.cache_dir = os.environ.get("DRYRUN_CACHE_DIR") + else: + self.cache_dir = os.path.join( + tempfile.gettempdir(), "bigquery_etl_dryrun_cache" + ) + + # check for global cache enable via environment variable + env_cache_enabled = os.environ.get("DRYRUN_CACHE_ENABLED", "").lower() in ( + "true", + "1", + "yes", + ) + self.cache_enabled = cache_enabled or env_cache_enabled + + # set cache TTL - prioritize: explicit parameter > env var > default + if cache_ttl_hours is not None: + self.cache_ttl_hours = cache_ttl_hours + elif os.environ.get("DRYRUN_CACHE_TTL_HOURS"): + try: + self.cache_ttl_hours = float(os.environ.get("DRYRUN_CACHE_TTL_HOURS")) + except (ValueError, TypeError): + self.cache_ttl_hours = ConfigLoader.get( + "dry_run", "cache_ttl_hours", fallback=1 + ) + else: + self.cache_ttl_hours = ConfigLoader.get( + "dry_run", "cache_ttl_hours", fallback=1 + ) + + if self.cache_enabled: + os.makedirs(self.cache_dir, exist_ok=True) + @cached_property def client(self): """Get BigQuery client instance.""" @@ -196,6 +239,88 @@ def skip(self): sql_dir=self.sql_dir ) + def _get_cache_key(self, sql): + """Generate cache key based on SQL content and file modification time.""" + # create hash from SQL content and file modification time + content_hash = hashlib.sha256(sql.encode("utf-8")).hexdigest() + + # include relevant parameters that affect the dry run result + cache_params = { + "content_hash": content_hash, + "use_cloud_function": self.use_cloud_function, + "project": self.project, + "dataset": self.dataset, + "table": self.table, + "strip_dml": self.strip_dml, + "cache_ttl_hours": self.cache_ttl_hours, + } + + print(f"Generated cache key for {self.sqlfile} ({cache_params})") + + cache_string = json.dumps(cache_params, sort_keys=True) + return hashlib.sha256(cache_string.encode("utf-8")).hexdigest()[:16] + + def _get_cache_file_path(self, cache_key): + """Get the full path to the cache file.""" + return os.path.join(self.cache_dir, f"dryrun_cache_{cache_key}.pkl") + + def _is_cache_valid(self, cache_file_path): + """Check if cache file exists and is within TTL.""" + if not cache_file_path or not exists(cache_file_path): + return False + + try: + cache_age_hours = (time.time() - getmtime(cache_file_path)) / 3600 + return cache_age_hours < self.cache_ttl_hours + except OSError: + return False + + def _load_from_cache(self, cache_key): + """Load dry run result from cache if valid.""" + if not self.cache_enabled: + return None + + cache_file_path = self._get_cache_file_path(cache_key) + + if not self._is_cache_valid(cache_file_path): + return None + + try: + with open(cache_file_path, "rb") as f: + cached_result = pickle.load(f) + print(f"Using cached dry run result for {self.sqlfile} ({cache_key})") + return cached_result + except (pickle.PickleError, OSError, EOFError) as e: + print(f"Failed to load cache for {self.sqlfile}: {e}") + # remove corrupted cache file + try: + os.remove(cache_file_path) + except OSError: + pass + return None + + def _save_to_cache(self, cache_key, result): + """Save dry run result to cache.""" + if not self.cache_enabled: + return + + cache_file_path = self._get_cache_file_path(cache_key) + + try: + temp_file_path = cache_file_path + ".tmp" + with open(temp_file_path, "wb") as f: + pickle.dump(result, f) + os.rename(temp_file_path, cache_file_path) + print(f"Cached dry run result for {self.sqlfile} ({cache_key})") + except (pickle.PickleError, OSError) as e: + print(f"Failed to cache result for {self.sqlfile}: {e}") + # clean up temp file if it exists + try: + if os.path.exists(temp_file_path): + os.remove(temp_file_path) + except OSError: + pass + def get_sql(self): """Get SQL content.""" if exists(self.sqlfile): @@ -231,6 +356,12 @@ def dry_run_result(self): else: sql = self.get_sql() + # check cache first + cache_key = self._get_cache_key(sql) + cached_result = self._load_from_cache(cache_key) + if cached_result is not None: + return cached_result + query_parameters = [] scheduling_metadata = self.metadata.scheduling if self.metadata else {} if date_partition_parameter := scheduling_metadata.get( @@ -338,6 +469,9 @@ def dry_run_result(self): } self.dry_run_duration = time.time() - start_time + + self._save_to_cache(cache_key, result) + return result except Exception as e: diff --git a/bigquery_etl/metadata/parse_metadata.py b/bigquery_etl/metadata/parse_metadata.py index 1c1c0f38cca..714c90384af 100644 --- a/bigquery_etl/metadata/parse_metadata.py +++ b/bigquery_etl/metadata/parse_metadata.py @@ -277,6 +277,9 @@ def from_file(cls, metadata_file): monitoring = None require_column_descriptions = False + if metadata_file is None or not os.path.exists(metadata_file): + raise FileNotFoundError(f"Metadata file {metadata_file} not found.") + with open(metadata_file, "r") as yaml_stream: try: metadata = yaml.safe_load(yaml_stream) diff --git a/bqetl_project.yaml b/bqetl_project.yaml index c07929c6300..29552e32f0e 100644 --- a/bqetl_project.yaml +++ b/bqetl_project.yaml @@ -32,6 +32,8 @@ dry_run: function_accounts: - bigquery-etl-dryrun@moz-fx-data-shared-prod.iam.gserviceaccount.com - bigquery-etl-dryrun@moz-fx-data-shar-nonprod-efed.iam.gserviceaccount.com + cache_enabled: false + cache_ttl_hours: 1 skip: ## skip all data-observability-dev queries due to CI lacking permissions in that project. # TODO: once data observability platform assessment concludes this should be removed. diff --git a/sql_generators/README.md b/sql_generators/README.md index 3b2ceef8a28..751c9bf7a80 100644 --- a/sql_generators/README.md +++ b/sql_generators/README.md @@ -9,3 +9,4 @@ The directories in `sql_generators/` represent the generated queries and will co Each `__init__.py` file needs to implement a `generate()` method that is configured as a [click command](https://click.palletsprojects.com/en/8.0.x/). The `bqetl` CLI will automatically add these commands to the `./bqetl query generate` command group. After changes to a schema or adding new tables, the schema is automatically derived from the query and deployed the next day in DAG [bqetl_artifact_deployment](https://workflow.telemetry.mozilla.org/dags/bqetl_artifact_deployment/grid). Alternatively, it can be manually generated and deployed using `./bqetl generate all` and `./bqetl query schema deploy`. + diff --git a/tests/test_dryrun.py b/tests/test_dryrun.py index 21a9f7a0848..1be396d20ce 100644 --- a/tests/test_dryrun.py +++ b/tests/test_dryrun.py @@ -1,4 +1,6 @@ import os +import time +from unittest.mock import patch import pytest @@ -192,3 +194,175 @@ def test_dryrun_metrics_query(self, tmp_query_path): dryrun = DryRun(sqlfile=str(query_file)) assert dryrun.is_valid() + + def test_cache_key_includes_ttl(self, tmp_query_path): + """Test that cache key includes TTL value so different TTLs create different cache entries.""" + query_file = tmp_query_path / "query.sql" + query_file.write_text("SELECT 123") + + # Create two DryRun instances with different TTLs + dry_run1 = DryRun(str(query_file), cache_enabled=True, cache_ttl_hours=1) + + dry_run2 = DryRun(str(query_file), cache_enabled=True, cache_ttl_hours=6) + + # Get cache keys for the same SQL content + sql = dry_run1.get_sql() + cache_key1 = dry_run1._get_cache_key(sql) + cache_key2 = dry_run2._get_cache_key(sql) + + # Cache keys should be different due to different TTL values + assert cache_key1 != cache_key2 + + def test_cache_file_path_generation(self, tmp_query_path): + """Test that cache file paths are generated correctly.""" + query_file = tmp_query_path / "query.sql" + query_file.write_text("SELECT 123") + + dry_run = DryRun(str(query_file), cache_enabled=True, cache_ttl_hours=1) + + sql = dry_run.get_sql() + cache_key = dry_run._get_cache_key(sql) + cache_file_path = dry_run._get_cache_file_path(cache_key) + + # Should be in temp directory with correct naming pattern + assert cache_file_path.startswith(dry_run.cache_dir) + assert f"dryrun_cache_{cache_key}.pkl" in cache_file_path + + def test_cache_validity_checks(self, tmp_query_path): + """Test cache validity checking logic.""" + query_file = tmp_query_path / "query.sql" + query_file.write_text("SELECT 123") + + dry_run = DryRun(str(query_file), cache_enabled=True, cache_ttl_hours=1) + + # Test with non-existent file + assert not dry_run._is_cache_valid("/non/existent/file.pkl") + + # Test with None path + assert not dry_run._is_cache_valid(None) + + # Test with empty string path + assert not dry_run._is_cache_valid("") + + @patch("bigquery_etl.dryrun.exists") + @patch("bigquery_etl.dryrun.getmtime") + def test_cache_ttl_expiry(self, mock_getmtime, mock_exists, tmp_query_path): + """Test that cache correctly expires based on TTL.""" + query_file = tmp_query_path / "query.sql" + query_file.write_text("SELECT 123") + + # Create metadata.yaml to avoid FileNotFoundError + metadata_file = tmp_query_path / "metadata.yaml" + metadata_file.write_text( + """friendly_name: Test Table +description: Test description +owners: + - test@example.com +""" + ) + + dry_run = DryRun( + str(query_file), cache_enabled=True, cache_ttl_hours=1 # 1 hour TTL + ) + + cache_file_path = "/fake/cache/file.pkl" + current_time = time.time() + + # Test cache within TTL (30 minutes old) + mock_exists.return_value = True + mock_getmtime.return_value = current_time - (30 * 60) # 30 minutes ago + assert dry_run._is_cache_valid(cache_file_path) + + # Test cache beyond TTL (2 hours old) + mock_getmtime.return_value = current_time - (2 * 60 * 60) # 2 hours ago + assert not dry_run._is_cache_valid(cache_file_path) + + def test_cache_key_different_for_different_content(self, tmp_query_path): + """Test that different SQL content generates different cache keys.""" + query_file = tmp_query_path / "query.sql" + + # Create metadata.yaml to avoid FileNotFoundError + metadata_file = tmp_query_path / "metadata.yaml" + metadata_file.write_text( + """friendly_name: Test Table +description: Test description +owners: + - test@example.com +""" + ) + + dry_run = DryRun(str(query_file), cache_enabled=True, cache_ttl_hours=1) + + # Different SQL content should produce different cache keys + sql1 = "SELECT 123" + sql2 = "SELECT 456" + + cache_key1 = dry_run._get_cache_key(sql1) + cache_key2 = dry_run._get_cache_key(sql2) + + assert cache_key1 != cache_key2 + + def test_cache_key_includes_parameters(self, tmp_query_path): + """Test that cache key includes all relevant parameters.""" + query_file = tmp_query_path / "query.sql" + query_file.write_text("SELECT 123") + + # Create two DryRun instances with different parameters + dry_run1 = DryRun( + str(query_file), + cache_enabled=True, + project="project1", + dataset="dataset1", + billing_project="billing1", + ) + + dry_run2 = DryRun( + str(query_file), + cache_enabled=True, + project="project2", + dataset="dataset2", + billing_project="billing2", + ) + + sql = dry_run1.get_sql() + cache_key1 = dry_run1._get_cache_key(sql) + cache_key2 = dry_run2._get_cache_key(sql) + + # Different parameters should produce different cache keys + assert cache_key1 != cache_key2 + + def test_cache_load_success(self, tmp_query_path): + """Test successful cache loading.""" + query_file = tmp_query_path / "query.sql" + query_file.write_text("SELECT 123") + + dry_run = DryRun( + str(query_file), + cache_enabled=True, + project="project1", + dataset="dataset1", + billing_project="billing1", + ) + + expected_result = {"valid": True, "cached": True} + dry_run._save_to_cache("test_cache_key", expected_result) + + result = dry_run._load_from_cache("test_cache_key") + assert result == expected_result + + @patch("bigquery_etl.metadata.parse_metadata.Metadata.of_query_file") + def test_cache_disabled_behavior(self, mock_metadata, tmp_query_path): + """Test that caching is properly disabled when cache_enabled=False.""" + query_file = tmp_query_path / "query.sql" + query_file.write_text("SELECT 123") + + # Mock metadata loading to avoid FileNotFoundError + mock_metadata.return_value = None + + dry_run = DryRun(str(query_file), cache_enabled=False, cache_ttl_hours=1) + + # Cache operations should return None when disabled + assert dry_run._load_from_cache("test_key") is None + + # Save to cache should do nothing when disabled + dry_run._save_to_cache("test_key", {"test": "data"}) # Should not raise error