diff --git a/snuba/manual_jobs/extract_span_data.py b/snuba/manual_jobs/extract_span_data.py new file mode 100644 index 00000000000..badcef129c3 --- /dev/null +++ b/snuba/manual_jobs/extract_span_data.py @@ -0,0 +1,102 @@ +from datetime import datetime + +from snuba.clusters.cluster import ClickhouseClientSettings, get_cluster +from snuba.clusters.storage_sets import StorageSetKey +from snuba.manual_jobs import Job, JobLogger, JobSpec + + +class ExtractSpanData(Job): + def __init__(self, job_spec: JobSpec) -> None: + super().__init__(job_spec) + + def _generate_spans_query(self): + # Columns that should not be hashed (numeric and date types) + numeric_columns = { + "organization_id", + "project_id", + "span_id", + "parent_span_id", + "segment_id", + "is_segment", + "_sort_timestamp", + "start_timestamp", + "end_timestamp", + "duration_micro", + "exclusive_time_micro", + "retention_days", + "sampling_factor", + "sampling_weight", + "sign", + } + + base_columns = [ + "organization_id", + "project_id", + "service", + "trace_id", + "span_id", + "parent_span_id", + "segment_id", + "segment_name", + "is_segment", + "_sort_timestamp", + "start_timestamp", + "end_timestamp", + "duration_micro", + "exclusive_time_micro", + "retention_days", + "name", + "sampling_factor", + "sampling_weight", + "sign", + ] + + map_columns = [] + for prefix in ["attr_str_", "attr_num_"]: + map_columns.extend(f"{prefix}{i}" for i in range(20)) + + all_columns = base_columns + map_columns + + scrubbed_columns = [] + for col in all_columns: + if col in numeric_columns or col.startswith("attr_num"): + scrubbed_columns.append(col) + elif col.startswith("attr_str"): + scrubbed_columns.append( + f"mapApply((k, v) -> (k, cityHash64(v)), {col}) AS {col}" + ) + else: + scrubbed_columns.append(f"cityHash64({col}) AS {col}") + + query = f""" + SELECT + {', '.join(scrubbed_columns)} + FROM {self.table_name} + WHERE _sort_timestamp BETWEEN toDateTime('{self.start_timestamp}') AND toDateTime('{self.end_timestamp}') + AND organization_id IN {self.organization_ids} + LIMIT {self.limit} + """ + + return query + + def execute(self, logger: JobLogger) -> None: + cluster = get_cluster(StorageSetKey.EVENTS_ANALYTICS_PLATFORM) + connection = cluster.get_query_connection(ClickhouseClientSettings.QUERY) + + current_time = datetime.now().strftime("%Y%m%d_%H%M%S") + file_name = f"scrubbed_spans_data_{current_time}.csv.gz" + + query = f""" + INSERT INTO FUNCTION gcs('https://storage.googleapis.com/{self.gcp_bucket_name}/{file_name}', + 'CSVWithNames', + '', + 'gzip' + ) + {self._generate_spans_query()} + """ + + logger.info("Executing query") + connection.execute(query=query) + logger.info( + f"Data written to GCS bucket: https://storage.googleapis.com/{self.gcp_bucket_name}/{file_name}" + ) diff --git a/snuba/utils/hashes.py b/snuba/utils/hashes.py index d9cd7ad2f85..b03e94e3395 100644 --- a/snuba/utils/hashes.py +++ b/snuba/utils/hashes.py @@ -7,3 +7,14 @@ def fnv_1a(b: bytes) -> int: res = res ^ byt res = (res * fnv_1a_32_prime) & 0xFFFFFFFF # force 32 bit return res + + +def fnv_1a_64(b: bytes) -> int: + fnv_1a_64_prime = 1099511628211 + fnv_1a_64_offset_basis = 14695981039346656037 + + res = fnv_1a_64_offset_basis + for byt in b: + res = res ^ byt + res = (res * fnv_1a_64_prime) & 0xFFFFFFFFFFFFFFFF # force 64 bit + return res diff --git a/tests/manual_jobs/test_extract_span_data.py b/tests/manual_jobs/test_extract_span_data.py new file mode 100644 index 00000000000..bd2e62c2773 --- /dev/null +++ b/tests/manual_jobs/test_extract_span_data.py @@ -0,0 +1,119 @@ +import random +import uuid +from datetime import datetime, timedelta +from typing import Any, Mapping + +import pytest + +from snuba.datasets.storages.factory import get_storage +from snuba.datasets.storages.storage_key import StorageKey +from snuba.manual_jobs import JobSpec +from snuba.manual_jobs.runner import run_job +from tests.helpers import write_raw_unprocessed_events + + +def _gen_message( + dt: datetime, + organization_id: int, + measurements: dict[str, dict[str, float]] | None = None, + tags: dict[str, str] | None = None, +) -> Mapping[str, Any]: + measurements = measurements or {} + tags = tags or {} + return { + "description": "/api/0/relays/projectconfigs/", + "duration_ms": 152, + "event_id": "d826225de75d42d6b2f01b957d51f18f", + "exclusive_time_ms": 0.228, + "is_segment": True, + "data": { + "sentry.environment": "development", + "thread.name": "uWSGIWorker1Core0", + "thread.id": "8522009600", + "sentry.segment.name": "/api/0/relays/projectconfigs/", + "sentry.sdk.name": "sentry.python.django", + "sentry.sdk.version": "2.7.0", + "my.float.field": 101.2, + "my.int.field": 2000, + "my.neg.field": -100, + "my.neg.float.field": -101.2, + "my.true.bool.field": True, + "my.false.bool.field": False, + }, + "measurements": { + "num_of_spans": {"value": 50.0}, + "eap.measurement": {"value": random.choice([1, 100, 1000])}, + **measurements, + }, + "organization_id": organization_id, + "origin": "auto.http.django", + "project_id": 1, + "received": 1721319572.877828, + "retention_days": 90, + "segment_id": "8873a98879faf06d", + "sentry_tags": { + "category": "http", + "environment": "development", + "op": "http.server", + "platform": "python", + "sdk.name": "sentry.python.django", + "sdk.version": "2.7.0", + "status": "ok", + "status_code": "200", + "thread.id": "8522009600", + "thread.name": "uWSGIWorker1Core0", + "trace.status": "ok", + "transaction": "/api/0/relays/projectconfigs/", + "transaction.method": "POST", + "transaction.op": "http.server", + "user": "ip:127.0.0.1", + }, + "span_id": "123456781234567D", + "tags": { + "http.status_code": "200", + "relay_endpoint_version": "3", + "relay_id": "88888888-4444-4444-8444-cccccccccccc", + "relay_no_cache": "False", + "relay_protocol_version": "3", + "relay_use_post_or_schedule": "True", + "relay_use_post_or_schedule_rejected": "version", + "spans_over_limit": "False", + "server_name": "blah", + "color": random.choice(["red", "green", "blue"]), + "location": random.choice(["mobile", "frontend", "backend"]), + **tags, + }, + "trace_id": uuid.uuid4().hex, + "start_timestamp_ms": int(dt.timestamp()) * 1000 - int(random.gauss(1000, 200)), + "start_timestamp_precise": dt.timestamp(), + "end_timestamp_precise": dt.timestamp() + 1, + } + + +@pytest.mark.clickhouse_db +@pytest.mark.redis_db +@pytest.mark.skip(reason="can't test writing to GCS") +def test_extract_span_data() -> None: + BASE_TIME = datetime.utcnow().replace( + minute=0, second=0, microsecond=0 + ) - timedelta(minutes=180) + organization_ids = [0, 1] + spans_storage = get_storage(StorageKey("eap_spans")) + messages = [ + _gen_message(BASE_TIME - timedelta(minutes=i), organization_id) + for organization_id in organization_ids + for i in range(20) + ] + + write_raw_unprocessed_events(spans_storage, messages) # type: ignore + + run_job( + JobSpec( + "jobid", + "ExtractSpanData", + False, + { + "organization_ids": [0, 1], + }, + ) + )