Skip to content

Commit

Permalink
fix mypy issues
Browse files Browse the repository at this point in the history
  • Loading branch information
davidtsuk committed Jan 6, 2025
1 parent 2b5e68b commit 9e8e76a
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 9 deletions.
41 changes: 33 additions & 8 deletions snuba/manual_jobs/extract_span_data.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,38 @@
from typing import Any, Mapping, Optional

from snuba.clusters.cluster import ClickhouseClientSettings, get_cluster
from snuba.clusters.storage_sets import StorageSetKey
from snuba.manual_jobs import Job, JobLogger, JobSpec


class ExtractSpanData(Job):
def __init__(self, job_spec: JobSpec) -> None:
self.__validate_job_params(job_spec.params)
super().__init__(job_spec)

def _generate_spans_query(self):
def __validate_job_params(self, params: Optional[Mapping[Any, Any]]) -> None:
assert params
required_params = [
"organization_ids",
"start_timestamp",
"end_timestamp",
"table_name",
"limit",
"gcp_bucket_name",
"output_file_path",
]
for param in required_params:
assert param in params

self._organization_ids = params["organization_ids"]
self._start_timestamp = params["start_timestamp"]
self._end_timestamp = params["end_timestamp"]
self._table_name = params["table_name"]
self._limit = params["limit"]
self._gcp_bucket_name = params["gcp_bucket_name"]
self._output_file_path = params["output_file_path"]

def _generate_spans_query(self) -> str:
# Columns that should not be scrubbed
unscrubbed_columns = {
"span_id",
Expand Down Expand Up @@ -48,7 +73,7 @@ def _generate_spans_query(self):
"sign",
]

map_columns = []
map_columns: list[str] = []
for prefix in ["attr_str_", "attr_num_"]:
map_columns.extend(f"{prefix}{i}" for i in range(20))

Expand All @@ -68,10 +93,10 @@ def _generate_spans_query(self):
query = f"""
SELECT
{', '.join(scrubbed_columns)}
FROM {self.table_name}
WHERE _sort_timestamp BETWEEN toDateTime('{self.start_timestamp}') AND toDateTime('{self.end_timestamp}')
AND organization_id IN {self.organization_ids}
LIMIT {self.limit}
FROM {self._table_name}
WHERE _sort_timestamp BETWEEN toDateTime('{self._start_timestamp}') AND toDateTime('{self._end_timestamp}')
AND organization_id IN {self._organization_ids}
LIMIT {self._limit}
"""

return query
Expand All @@ -81,7 +106,7 @@ def execute(self, logger: JobLogger) -> None:
connection = cluster.get_query_connection(ClickhouseClientSettings.QUERY)

query = f"""
INSERT INTO FUNCTION gcs('{self.gcp_bucket_name}/{self.output_file_path}',
INSERT INTO FUNCTION gcs('{self._gcp_bucket_name}/{self._output_file_path}',
'CSVWithNames',
'auto',
'gzip'
Expand All @@ -92,5 +117,5 @@ def execute(self, logger: JobLogger) -> None:
logger.info("Executing query")
connection.execute(query=query)
logger.info(
f"Data written to GCS bucket: {self.gcp_bucket_name}/{self.output_file_path}"
f"Data written to GCS bucket: {self._gcp_bucket_name}/{self._output_file_path}"
)
2 changes: 1 addition & 1 deletion tests/manual_jobs/test_extract_span_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def _gen_message(

@pytest.mark.clickhouse_db
@pytest.mark.redis_db
# @pytest.mark.skip(reason="can't test writing to GCS")
@pytest.mark.skip(reason="can't test writing to GCS")
def test_extract_span_data() -> None:
BASE_TIME = datetime.utcnow().replace(
minute=0, second=0, microsecond=0
Expand Down

0 comments on commit 9e8e76a

Please sign in to comment.