diff --git a/snuba/manual_jobs/extract_span_data.py b/snuba/manual_jobs/extract_span_data.py index badcef129c3..23f568528ec 100644 --- a/snuba/manual_jobs/extract_span_data.py +++ b/snuba/manual_jobs/extract_span_data.py @@ -1,5 +1,3 @@ -from datetime import datetime - from snuba.clusters.cluster import ClickhouseClientSettings, get_cluster from snuba.clusters.storage_sets import StorageSetKey from snuba.manual_jobs import Job, JobLogger, JobSpec @@ -10,11 +8,10 @@ def __init__(self, job_spec: JobSpec) -> None: super().__init__(job_spec) def _generate_spans_query(self): - # Columns that should not be hashed (numeric and date types) - numeric_columns = { - "organization_id", - "project_id", + # Columns that should not be scrubbed + unscrubbed_columns = { "span_id", + "trace_id", "parent_span_id", "segment_id", "is_segment", @@ -59,14 +56,14 @@ def _generate_spans_query(self): scrubbed_columns = [] for col in all_columns: - if col in numeric_columns or col.startswith("attr_num"): + if col in unscrubbed_columns or col.startswith("attr_num"): scrubbed_columns.append(col) elif col.startswith("attr_str"): scrubbed_columns.append( - f"mapApply((k, v) -> (k, cityHash64(v)), {col}) AS {col}" + f"mapApply((k, v) -> (k, cityHash64(v)), {col}) AS {col}_scrubbed" ) else: - scrubbed_columns.append(f"cityHash64({col}) AS {col}") + scrubbed_columns.append(f"cityHash64({col}) AS {col}_scrubbed") query = f""" SELECT @@ -83,13 +80,10 @@ def execute(self, logger: JobLogger) -> None: cluster = get_cluster(StorageSetKey.EVENTS_ANALYTICS_PLATFORM) connection = cluster.get_query_connection(ClickhouseClientSettings.QUERY) - current_time = datetime.now().strftime("%Y%m%d_%H%M%S") - file_name = f"scrubbed_spans_data_{current_time}.csv.gz" - query = f""" - INSERT INTO FUNCTION gcs('https://storage.googleapis.com/{self.gcp_bucket_name}/{file_name}', + INSERT INTO FUNCTION gcs('{self.gcp_bucket_name}/{self.output_file_path}', 'CSVWithNames', - '', + 'auto', 'gzip' ) {self._generate_spans_query()} @@ -98,5 +92,5 @@ def execute(self, logger: JobLogger) -> None: logger.info("Executing query") connection.execute(query=query) logger.info( - f"Data written to GCS bucket: https://storage.googleapis.com/{self.gcp_bucket_name}/{file_name}" + f"Data written to GCS bucket: {self.gcp_bucket_name}/{self.output_file_path}" ) diff --git a/snuba/utils/hashes.py b/snuba/utils/hashes.py index b03e94e3395..d9cd7ad2f85 100644 --- a/snuba/utils/hashes.py +++ b/snuba/utils/hashes.py @@ -7,14 +7,3 @@ def fnv_1a(b: bytes) -> int: res = res ^ byt res = (res * fnv_1a_32_prime) & 0xFFFFFFFF # force 32 bit return res - - -def fnv_1a_64(b: bytes) -> int: - fnv_1a_64_prime = 1099511628211 - fnv_1a_64_offset_basis = 14695981039346656037 - - res = fnv_1a_64_offset_basis - for byt in b: - res = res ^ byt - res = (res * fnv_1a_64_prime) & 0xFFFFFFFFFFFFFFFF # force 64 bit - return res diff --git a/tests/manual_jobs/test_extract_span_data.py b/tests/manual_jobs/test_extract_span_data.py index bd2e62c2773..121d3efa248 100644 --- a/tests/manual_jobs/test_extract_span_data.py +++ b/tests/manual_jobs/test_extract_span_data.py @@ -8,6 +8,7 @@ from snuba.datasets.storages.factory import get_storage from snuba.datasets.storages.storage_key import StorageKey from snuba.manual_jobs import JobSpec +from snuba.manual_jobs.job_status import JobStatus from snuba.manual_jobs.runner import run_job from tests.helpers import write_raw_unprocessed_events @@ -92,7 +93,7 @@ def _gen_message( @pytest.mark.clickhouse_db @pytest.mark.redis_db -@pytest.mark.skip(reason="can't test writing to GCS") +# @pytest.mark.skip(reason="can't test writing to GCS") def test_extract_span_data() -> None: BASE_TIME = datetime.utcnow().replace( minute=0, second=0, microsecond=0 @@ -107,13 +108,22 @@ def test_extract_span_data() -> None: write_raw_unprocessed_events(spans_storage, messages) # type: ignore - run_job( - JobSpec( - "jobid", - "ExtractSpanData", - False, - { - "organization_ids": [0, 1], - }, + assert ( + run_job( + JobSpec( + "jobid", + "ExtractSpanData", + False, + { + "organization_ids": [0, 1], + "start_timestamp": (BASE_TIME - timedelta(minutes=30)).isoformat(), + "end_timestamp": (BASE_TIME + timedelta(hours=24)).isoformat(), + "table_name": "snuba_test.eap_spans_2_local", + "limit": 1000000, + "output_file_path": "scrubbed_spans_data.csv.gz", + "gcp_bucket_name": "test-bucket", + }, + ) ) + == JobStatus.FINISHED )