diff --git a/snuba/manual_jobs/extract_span_data.py b/snuba/manual_jobs/extract_span_data.py index 23f568528e..0644aab08f 100644 --- a/snuba/manual_jobs/extract_span_data.py +++ b/snuba/manual_jobs/extract_span_data.py @@ -1,3 +1,5 @@ +from typing import Any, Mapping, Optional + from snuba.clusters.cluster import ClickhouseClientSettings, get_cluster from snuba.clusters.storage_sets import StorageSetKey from snuba.manual_jobs import Job, JobLogger, JobSpec @@ -5,9 +7,32 @@ class ExtractSpanData(Job): def __init__(self, job_spec: JobSpec) -> None: + self.__validate_job_params(job_spec.params) super().__init__(job_spec) - def _generate_spans_query(self): + def __validate_job_params(self, params: Optional[Mapping[Any, Any]]) -> None: + assert params + required_params = [ + "organization_ids", + "start_timestamp", + "end_timestamp", + "table_name", + "limit", + "gcp_bucket_name", + "output_file_path", + ] + for param in required_params: + assert param in params + + self._organization_ids = params["organization_ids"] + self._start_timestamp = params["start_timestamp"] + self._end_timestamp = params["end_timestamp"] + self._table_name = params["table_name"] + self._limit = params["limit"] + self._gcp_bucket_name = params["gcp_bucket_name"] + self._output_file_path = params["output_file_path"] + + def _generate_spans_query(self) -> str: # Columns that should not be scrubbed unscrubbed_columns = { "span_id", @@ -48,7 +73,7 @@ def _generate_spans_query(self): "sign", ] - map_columns = [] + map_columns: list[str] = [] for prefix in ["attr_str_", "attr_num_"]: map_columns.extend(f"{prefix}{i}" for i in range(20)) @@ -68,10 +93,10 @@ def _generate_spans_query(self): query = f""" SELECT {', '.join(scrubbed_columns)} - FROM {self.table_name} - WHERE _sort_timestamp BETWEEN toDateTime('{self.start_timestamp}') AND toDateTime('{self.end_timestamp}') - AND organization_id IN {self.organization_ids} - LIMIT {self.limit} + FROM {self._table_name} + WHERE _sort_timestamp BETWEEN toDateTime('{self._start_timestamp}') AND toDateTime('{self._end_timestamp}') + AND organization_id IN {self._organization_ids} + LIMIT {self._limit} """ return query @@ -81,7 +106,7 @@ def execute(self, logger: JobLogger) -> None: connection = cluster.get_query_connection(ClickhouseClientSettings.QUERY) query = f""" - INSERT INTO FUNCTION gcs('{self.gcp_bucket_name}/{self.output_file_path}', + INSERT INTO FUNCTION gcs('{self._gcp_bucket_name}/{self._output_file_path}', 'CSVWithNames', 'auto', 'gzip' @@ -92,5 +117,5 @@ def execute(self, logger: JobLogger) -> None: logger.info("Executing query") connection.execute(query=query) logger.info( - f"Data written to GCS bucket: {self.gcp_bucket_name}/{self.output_file_path}" + f"Data written to GCS bucket: {self._gcp_bucket_name}/{self._output_file_path}" ) diff --git a/tests/manual_jobs/test_extract_span_data.py b/tests/manual_jobs/test_extract_span_data.py index 121d3efa24..3ea9e2f7cf 100644 --- a/tests/manual_jobs/test_extract_span_data.py +++ b/tests/manual_jobs/test_extract_span_data.py @@ -93,7 +93,7 @@ def _gen_message( @pytest.mark.clickhouse_db @pytest.mark.redis_db -# @pytest.mark.skip(reason="can't test writing to GCS") +@pytest.mark.skip(reason="can't test writing to GCS") def test_extract_span_data() -> None: BASE_TIME = datetime.utcnow().replace( minute=0, second=0, microsecond=0