Skip to content

Commit

Permalink
Update script
Browse files Browse the repository at this point in the history
  • Loading branch information
davidtsuk committed Jan 6, 2025
1 parent a211301 commit 2b5e68b
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 35 deletions.
24 changes: 9 additions & 15 deletions snuba/manual_jobs/extract_span_data.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from datetime import datetime

from snuba.clusters.cluster import ClickhouseClientSettings, get_cluster
from snuba.clusters.storage_sets import StorageSetKey
from snuba.manual_jobs import Job, JobLogger, JobSpec
Expand All @@ -10,11 +8,10 @@ def __init__(self, job_spec: JobSpec) -> None:
super().__init__(job_spec)

def _generate_spans_query(self):
# Columns that should not be hashed (numeric and date types)
numeric_columns = {
"organization_id",
"project_id",
# Columns that should not be scrubbed
unscrubbed_columns = {
"span_id",
"trace_id",
"parent_span_id",
"segment_id",
"is_segment",
Expand Down Expand Up @@ -59,14 +56,14 @@ def _generate_spans_query(self):

scrubbed_columns = []
for col in all_columns:
if col in numeric_columns or col.startswith("attr_num"):
if col in unscrubbed_columns or col.startswith("attr_num"):
scrubbed_columns.append(col)
elif col.startswith("attr_str"):
scrubbed_columns.append(
f"mapApply((k, v) -> (k, cityHash64(v)), {col}) AS {col}"
f"mapApply((k, v) -> (k, cityHash64(v)), {col}) AS {col}_scrubbed"
)
else:
scrubbed_columns.append(f"cityHash64({col}) AS {col}")
scrubbed_columns.append(f"cityHash64({col}) AS {col}_scrubbed")

query = f"""
SELECT
Expand All @@ -83,13 +80,10 @@ def execute(self, logger: JobLogger) -> None:
cluster = get_cluster(StorageSetKey.EVENTS_ANALYTICS_PLATFORM)
connection = cluster.get_query_connection(ClickhouseClientSettings.QUERY)

current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
file_name = f"scrubbed_spans_data_{current_time}.csv.gz"

query = f"""
INSERT INTO FUNCTION gcs('https://storage.googleapis.com/{self.gcp_bucket_name}/{file_name}',
INSERT INTO FUNCTION gcs('{self.gcp_bucket_name}/{self.output_file_path}',
'CSVWithNames',
'',
'auto',
'gzip'
)
{self._generate_spans_query()}
Expand All @@ -98,5 +92,5 @@ def execute(self, logger: JobLogger) -> None:
logger.info("Executing query")
connection.execute(query=query)
logger.info(
f"Data written to GCS bucket: https://storage.googleapis.com/{self.gcp_bucket_name}/{file_name}"
f"Data written to GCS bucket: {self.gcp_bucket_name}/{self.output_file_path}"
)
11 changes: 0 additions & 11 deletions snuba/utils/hashes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,3 @@ def fnv_1a(b: bytes) -> int:
res = res ^ byt
res = (res * fnv_1a_32_prime) & 0xFFFFFFFF # force 32 bit
return res


def fnv_1a_64(b: bytes) -> int:
fnv_1a_64_prime = 1099511628211
fnv_1a_64_offset_basis = 14695981039346656037

res = fnv_1a_64_offset_basis
for byt in b:
res = res ^ byt
res = (res * fnv_1a_64_prime) & 0xFFFFFFFFFFFFFFFF # force 64 bit
return res
28 changes: 19 additions & 9 deletions tests/manual_jobs/test_extract_span_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from snuba.datasets.storages.factory import get_storage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.manual_jobs import JobSpec
from snuba.manual_jobs.job_status import JobStatus
from snuba.manual_jobs.runner import run_job
from tests.helpers import write_raw_unprocessed_events

Expand Down Expand Up @@ -92,7 +93,7 @@ def _gen_message(

@pytest.mark.clickhouse_db
@pytest.mark.redis_db
@pytest.mark.skip(reason="can't test writing to GCS")
# @pytest.mark.skip(reason="can't test writing to GCS")
def test_extract_span_data() -> None:
BASE_TIME = datetime.utcnow().replace(
minute=0, second=0, microsecond=0
Expand All @@ -107,13 +108,22 @@ def test_extract_span_data() -> None:

write_raw_unprocessed_events(spans_storage, messages) # type: ignore

run_job(
JobSpec(
"jobid",
"ExtractSpanData",
False,
{
"organization_ids": [0, 1],
},
assert (
run_job(
JobSpec(
"jobid",
"ExtractSpanData",
False,
{
"organization_ids": [0, 1],
"start_timestamp": (BASE_TIME - timedelta(minutes=30)).isoformat(),
"end_timestamp": (BASE_TIME + timedelta(hours=24)).isoformat(),
"table_name": "snuba_test.eap_spans_2_local",
"limit": 1000000,
"output_file_path": "scrubbed_spans_data.csv.gz",
"gcp_bucket_name": "test-bucket",
},
)
)
== JobStatus.FINISHED
)

0 comments on commit 2b5e68b

Please sign in to comment.