Skip to content

Commit

Permalink
write script to send scrubbed data into a gcs bucket
Browse files Browse the repository at this point in the history
  • Loading branch information
davidtsuk committed Dec 20, 2024
1 parent 4ffbb49 commit a211301
Show file tree
Hide file tree
Showing 3 changed files with 232 additions and 0 deletions.
102 changes: 102 additions & 0 deletions snuba/manual_jobs/extract_span_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from datetime import datetime

from snuba.clusters.cluster import ClickhouseClientSettings, get_cluster
from snuba.clusters.storage_sets import StorageSetKey
from snuba.manual_jobs import Job, JobLogger, JobSpec


class ExtractSpanData(Job):
def __init__(self, job_spec: JobSpec) -> None:
super().__init__(job_spec)

def _generate_spans_query(self):
# Columns that should not be hashed (numeric and date types)
numeric_columns = {
"organization_id",
"project_id",
"span_id",
"parent_span_id",
"segment_id",
"is_segment",
"_sort_timestamp",
"start_timestamp",
"end_timestamp",
"duration_micro",
"exclusive_time_micro",
"retention_days",
"sampling_factor",
"sampling_weight",
"sign",
}

base_columns = [
"organization_id",
"project_id",
"service",
"trace_id",
"span_id",
"parent_span_id",
"segment_id",
"segment_name",
"is_segment",
"_sort_timestamp",
"start_timestamp",
"end_timestamp",
"duration_micro",
"exclusive_time_micro",
"retention_days",
"name",
"sampling_factor",
"sampling_weight",
"sign",
]

map_columns = []
for prefix in ["attr_str_", "attr_num_"]:
map_columns.extend(f"{prefix}{i}" for i in range(20))

all_columns = base_columns + map_columns

scrubbed_columns = []
for col in all_columns:
if col in numeric_columns or col.startswith("attr_num"):
scrubbed_columns.append(col)
elif col.startswith("attr_str"):
scrubbed_columns.append(
f"mapApply((k, v) -> (k, cityHash64(v)), {col}) AS {col}"
)
else:
scrubbed_columns.append(f"cityHash64({col}) AS {col}")

query = f"""
SELECT
{', '.join(scrubbed_columns)}
FROM {self.table_name}
WHERE _sort_timestamp BETWEEN toDateTime('{self.start_timestamp}') AND toDateTime('{self.end_timestamp}')
AND organization_id IN {self.organization_ids}
LIMIT {self.limit}
"""

return query

def execute(self, logger: JobLogger) -> None:
cluster = get_cluster(StorageSetKey.EVENTS_ANALYTICS_PLATFORM)
connection = cluster.get_query_connection(ClickhouseClientSettings.QUERY)

current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
file_name = f"scrubbed_spans_data_{current_time}.csv.gz"

query = f"""
INSERT INTO FUNCTION gcs('https://storage.googleapis.com/{self.gcp_bucket_name}/{file_name}',
'CSVWithNames',
'',
'gzip'
)
{self._generate_spans_query()}
"""

logger.info("Executing query")
connection.execute(query=query)
logger.info(
f"Data written to GCS bucket: https://storage.googleapis.com/{self.gcp_bucket_name}/{file_name}"
)
11 changes: 11 additions & 0 deletions snuba/utils/hashes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,14 @@ def fnv_1a(b: bytes) -> int:
res = res ^ byt
res = (res * fnv_1a_32_prime) & 0xFFFFFFFF # force 32 bit
return res


def fnv_1a_64(b: bytes) -> int:
fnv_1a_64_prime = 1099511628211
fnv_1a_64_offset_basis = 14695981039346656037

res = fnv_1a_64_offset_basis
for byt in b:
res = res ^ byt
res = (res * fnv_1a_64_prime) & 0xFFFFFFFFFFFFFFFF # force 64 bit
return res
119 changes: 119 additions & 0 deletions tests/manual_jobs/test_extract_span_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import random
import uuid
from datetime import datetime, timedelta
from typing import Any, Mapping

import pytest

from snuba.datasets.storages.factory import get_storage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.manual_jobs import JobSpec
from snuba.manual_jobs.runner import run_job
from tests.helpers import write_raw_unprocessed_events


def _gen_message(
dt: datetime,
organization_id: int,
measurements: dict[str, dict[str, float]] | None = None,
tags: dict[str, str] | None = None,
) -> Mapping[str, Any]:
measurements = measurements or {}
tags = tags or {}
return {
"description": "/api/0/relays/projectconfigs/",
"duration_ms": 152,
"event_id": "d826225de75d42d6b2f01b957d51f18f",
"exclusive_time_ms": 0.228,
"is_segment": True,
"data": {
"sentry.environment": "development",
"thread.name": "uWSGIWorker1Core0",
"thread.id": "8522009600",
"sentry.segment.name": "/api/0/relays/projectconfigs/",
"sentry.sdk.name": "sentry.python.django",
"sentry.sdk.version": "2.7.0",
"my.float.field": 101.2,
"my.int.field": 2000,
"my.neg.field": -100,
"my.neg.float.field": -101.2,
"my.true.bool.field": True,
"my.false.bool.field": False,
},
"measurements": {
"num_of_spans": {"value": 50.0},
"eap.measurement": {"value": random.choice([1, 100, 1000])},
**measurements,
},
"organization_id": organization_id,
"origin": "auto.http.django",
"project_id": 1,
"received": 1721319572.877828,
"retention_days": 90,
"segment_id": "8873a98879faf06d",
"sentry_tags": {
"category": "http",
"environment": "development",
"op": "http.server",
"platform": "python",
"sdk.name": "sentry.python.django",
"sdk.version": "2.7.0",
"status": "ok",
"status_code": "200",
"thread.id": "8522009600",
"thread.name": "uWSGIWorker1Core0",
"trace.status": "ok",
"transaction": "/api/0/relays/projectconfigs/",
"transaction.method": "POST",
"transaction.op": "http.server",
"user": "ip:127.0.0.1",
},
"span_id": "123456781234567D",
"tags": {
"http.status_code": "200",
"relay_endpoint_version": "3",
"relay_id": "88888888-4444-4444-8444-cccccccccccc",
"relay_no_cache": "False",
"relay_protocol_version": "3",
"relay_use_post_or_schedule": "True",
"relay_use_post_or_schedule_rejected": "version",
"spans_over_limit": "False",
"server_name": "blah",
"color": random.choice(["red", "green", "blue"]),
"location": random.choice(["mobile", "frontend", "backend"]),
**tags,
},
"trace_id": uuid.uuid4().hex,
"start_timestamp_ms": int(dt.timestamp()) * 1000 - int(random.gauss(1000, 200)),
"start_timestamp_precise": dt.timestamp(),
"end_timestamp_precise": dt.timestamp() + 1,
}


@pytest.mark.clickhouse_db
@pytest.mark.redis_db
@pytest.mark.skip(reason="can't test writing to GCS")
def test_extract_span_data() -> None:
BASE_TIME = datetime.utcnow().replace(
minute=0, second=0, microsecond=0
) - timedelta(minutes=180)
organization_ids = [0, 1]
spans_storage = get_storage(StorageKey("eap_spans"))
messages = [
_gen_message(BASE_TIME - timedelta(minutes=i), organization_id)
for organization_id in organization_ids
for i in range(20)
]

write_raw_unprocessed_events(spans_storage, messages) # type: ignore

run_job(
JobSpec(
"jobid",
"ExtractSpanData",
False,
{
"organization_ids": [0, 1],
},
)
)

0 comments on commit a211301

Please sign in to comment.