Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(uptime): snuba table and configuration for new dataset #6686

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion rust_snuba/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ mod querylog;
mod release_health_metrics;
mod replays;
mod spans;
mod uptime_monitor_checks;
mod utils;

use crate::config::ProcessorConfig;
use crate::types::{InsertBatch, InsertOrReplacement, KafkaMessageMetadata};
use sentry_arroyo::backends::kafka::types::KafkaPayload;
Expand Down Expand Up @@ -54,6 +54,7 @@ define_processing_functions! {
("ProfilesMessageProcessor", "processed-profiles", ProcessingFunctionType::ProcessingFunction(profiles::process_message)),
("QuerylogProcessor", "snuba-queries", ProcessingFunctionType::ProcessingFunction(querylog::process_message)),
("ReplaysProcessor", "ingest-replay-events", ProcessingFunctionType::ProcessingFunction(replays::process_message)),
("UptimeMonitorChecksProcessor", "snuba-uptime-monitor-checks", ProcessingFunctionType::ProcessingFunction(uptime_monitor_checks::process_message)),
("SpansMessageProcessor", "snuba-spans", ProcessingFunctionType::ProcessingFunction(spans::process_message)),
("EAPSpansMessageProcessor", "snuba-spans", ProcessingFunctionType::ProcessingFunction(eap_spans::process_message)),
("OutcomesProcessor", "outcomes", ProcessingFunctionType::ProcessingFunction(outcomes::process_message)),
Expand Down
138 changes: 138 additions & 0 deletions rust_snuba/src/processors/uptime_monitor_checks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
use crate::config::ProcessorConfig;
use anyhow::Context;
use chrono::DateTime;
use sentry_arroyo::backends::kafka::types::KafkaPayload;
use serde::{Deserialize, Serialize};
use uuid::Uuid;

use crate::types::{InsertBatch, KafkaMessageMetadata};

pub fn process_message(
payload: KafkaPayload,
metadata: KafkaMessageMetadata,
_config: &ProcessorConfig,
) -> anyhow::Result<InsertBatch> {
let payload_bytes = payload.payload().context("Expected payload")?;
let (rows, origin_timestamp) =
deserialize_message(payload_bytes, metadata.partition, metadata.offset)?;

InsertBatch::from_rows(rows, DateTime::from_timestamp(origin_timestamp as i64, 0))
}

pub fn deserialize_message(
payload: &[u8],
partition: u16,
offset: u64,
) -> anyhow::Result<(Vec<UptimeMonitorCheckRow>, f64)> {
let monitor_message: UptimeMonitorCheckMessage = serde_json::from_slice(payload)?;

let rows = vec![UptimeMonitorCheckRow {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

organization_id: monitor_message.organization_id,
project_id: monitor_message.project_id,
environment: monitor_message.environment,
uptime_subscription_id: monitor_message.uptime_subscription_id,
uptime_check_id: monitor_message.uptime_check_id,
scheduled_check_time: monitor_message.scheduled_check_time as u32,
timestamp: monitor_message.timestamp as u32,
_sort_timestamp: monitor_message.timestamp as u32,
duration: monitor_message.duration,
region_id: monitor_message.region_id,
check_status: monitor_message.check_status,
check_status_reason: monitor_message.check_status_reason,
http_status_code: monitor_message.http_status_code,
trace_id: monitor_message.trace_id,
retention_days: monitor_message.retention_days,
partition,
offset,
}];

Ok((rows, monitor_message.timestamp))
}

#[derive(Debug, Deserialize)]
struct UptimeMonitorCheckMessage {
organization_id: u64,
project_id: u64,
environment: Option<String>,
uptime_subscription_id: u64,
uptime_check_id: Uuid,
scheduled_check_time: f64,
timestamp: f64,
duration: u64,
region_id: Option<u16>,
check_status: String,
check_status_reason: Option<String>,
http_status_code: u16,
trace_id: Uuid,
retention_days: u16,
}

#[derive(Debug, Default, Serialize)]
pub struct UptimeMonitorCheckRow {
organization_id: u64,
project_id: u64,
environment: Option<String>,
uptime_subscription_id: u64,
uptime_check_id: Uuid,
scheduled_check_time: u32,
timestamp: u32,
_sort_timestamp: u32,
duration: u64,
region_id: Option<u16>,
check_status: String,
check_status_reason: Option<String>,
http_status_code: u16,
trace_id: Uuid,
retention_days: u16,
partition: u16,
offset: u64,
}

#[cfg(test)]
mod tests {
use super::*;
use chrono::DateTime;
use sentry_arroyo::backends::kafka::types::KafkaPayload;

#[test]
fn test_parse_monitor_checkin() {
let data = r#"{
"organization_id": 789,
"project_id": 456,
"environment": "prod",
"uptime_subscription_id": 123,
"uptime_check_id": "550e8400-e29b-41d4-a716-446655440000",
"scheduled_check_time": 1702659277,
"timestamp": 1702659277,
"duration": 100,
"region_id": 42,
"check_status": "ok",
"check_status_reason": "Request successful",
"http_status_code": 200,
"trace_id": "550e8400-e29b-41d4-a716-446655440000",
"retention_days": 30
}"#;

let (rows, timestamp) = deserialize_message(data.as_bytes(), 0, 0).unwrap();
let monitor_row = rows.first().unwrap();

assert_eq!(monitor_row.organization_id, 789);
assert_eq!(monitor_row.project_id, 456);
assert_eq!(monitor_row.environment, Some("prod".to_string()));
assert_eq!(monitor_row.uptime_subscription_id, 123);
assert_eq!(monitor_row.duration, 100);
assert_eq!(monitor_row.timestamp, 1702659277);
assert_eq!(monitor_row._sort_timestamp, 1702659277);
assert_eq!(monitor_row.region_id, Some(42));
assert_eq!(&monitor_row.check_status, "ok");
assert_eq!(
monitor_row.check_status_reason,
Some("Request successful".to_string())
);
assert_eq!(monitor_row.http_status_code, 200);
assert_eq!(monitor_row.retention_days, 30);
assert_eq!(monitor_row.partition, 0);
assert_eq!(monitor_row.offset, 0);
assert_eq!(timestamp, 1702659277.0);
}
}
67 changes: 67 additions & 0 deletions scripts/load_uptime_checks.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the script for?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm using it to load in data locally to test row size, compression ratio, and querying the data using EAP API.

Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#!/usr/bin/env python3

import datetime
import json
import random
import uuid

import requests

# Generate and insert data for uptime checks for each project
base_time = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)

query = """
INSERT INTO default.uptime_monitor_checks_local (
organization_id, project_id, environment, uptime_subscription_id, uptime_check_id,
scheduled_check_time, timestamp, duration, region_id, check_status,
check_status_reason, http_status_code, trace_id, retention_days
) FORMAT JSONEachRow
"""

total_records = 0

for project_id in range(1, 2):
project_data = []
for minute in range(24 * 60 * 90): # 24 hours * 60 minutes * 90 days
timestamp = base_time + datetime.timedelta(minutes=minute)
scheduled_time = timestamp - datetime.timedelta(seconds=random.randint(1, 30))
http_status = (
500 if minute % 100 == 0 else 200
) # Every 100th record gets status 500
check_status = "failure" if http_status == 500 else "success"
project_data.append(
{
"organization_id": 1,
"project_id": project_id,
"environment": "production",
"uptime_subscription_id": random.randint(1, 3) * project_id,
"uptime_check_id": str(uuid.uuid4()),
"scheduled_check_time": scheduled_time.strftime("%Y-%m-%d %H:%M:%S"),
"timestamp": timestamp.strftime("%Y-%m-%d %H:%M:%S"),
"duration": random.randint(1, 1000),
"region_id": random.randint(1, 3),
"check_status": check_status,
"check_status_reason": "Timeout error"
if check_status == "failure"
else None,
"http_status_code": http_status,
"trace_id": str(uuid.uuid4()),
"retention_days": 30,
}
)

response = requests.post(
"http://localhost:8123",
params={"query": query},
data="\n".join(json.dumps(row) for row in project_data),
)

if response.status_code == 200:
total_records += len(project_data)
print(
f"Successfully inserted {len(project_data)} records for project {project_id}"
)
else:
print(f"Error inserting data for project {project_id}: {response.text}")

print(f"Total records inserted: {total_records}")
1 change: 1 addition & 0 deletions snuba/clusters/storage_sets.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"EVENTS_ANALYTICS_PLATFORM": "events_analytics_platform",
"GROUP_ATTRIBUTES": "group_attributes",
"PROFILE_CHUNKS": "profile_chunks",
"UPTIME_MONITOR_CHECKS": "uptime_monitor_checks",
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
version: v1
kind: dataset
name: uptime_monitor_checks

entities:
- uptime_monitor_checks
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
version: v1
kind: entity
name: uptime_monitor_checks
schema:
[
{ name: organization_id, type: UInt, args: { size: 64 } },
{ name: project_id, type: UInt, args: { size: 64 } },
{ name: environment, type: String, args: { schema_modifiers: [nullable] } },
{ name: uptime_subscription_id, type: UInt, args: { size: 64 } },
{ name: uptime_check_id, type: UUID },
{ name: scheduled_check_time, type: DateTime },
{ name: timestamp, type: DateTime }, # this is actual_check_time
{ name: duration, type: UInt, args: { size: 64 } },
{ name: region_id, type: UInt, args: { schema_modifiers: [nullable], size: 16 } },
{ name: check_status, type: String },
{ name: check_status_reason, type: String, args: { schema_modifiers: [nullable] } },
{ name: http_status_code, type: UInt, args: { size: 16 } },
{ name: trace_id, type: UUID },
{ name: retention_days, type: UInt, args: { schema_modifiers: [nullable], size: 16 } },
]
required_time_column: timestamp

storages:
- storage: uptime_monitor_checks
is_writable: true

storage_selector:
selector: DefaultQueryStorageSelector

query_processors:
- processor: BasicFunctionsProcessor
- processor: TimeSeriesProcessor
args:
time_group_columns:
time: timestamp
time_parse_columns:
- timestamp
validators:
- validator: EntityRequiredColumnValidator
args:
required_filter_columns:
- project_id
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
version: v1
kind: writable_storage
name: uptime_monitor_checks
storage:
key: uptime_monitor_checks
set_key: uptime_monitor_checks
readiness_state: limited
schema:
columns:
[
{ name: organization_id, type: UInt, args: { size: 64 } },
{ name: project_id, type: UInt, args: { size: 64 } },
{ name: environment, type: String, args: { schema_modifiers: [nullable] } },
{ name: uptime_subscription_id, type: UInt, args: { size: 64 } },
{ name: uptime_check_id, type: UUID },
{ name: scheduled_check_time, type: DateTime },
{ name: timestamp, type: DateTime }, # this is actual_check_time
{ name: duration, type: UInt, args: { size: 64 } },
{ name: region_id, type: UInt, args: { schema_modifiers: [nullable], size: 16 } },
{ name: check_status, type: String },
{ name: check_status_reason, type: String, args: { schema_modifiers: [nullable] } },
{ name: http_status_code, type: UInt, args: { size: 16 } },
{ name: trace_id, type: UUID },
{ name: retention_days, type: UInt, args: { schema_modifiers: [nullable], size: 16 } },
]
local_table_name: uptime_monitor_checks_local
dist_table_name: uptime_monitor_checks_dist


mandatory_condition_checkers:
- condition: ProjectIdEnforcer

stream_loader:
processor: UptimeMonitorChecksProcessor
default_topic: snuba-uptime-monitor-checks
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i believe we can actually reuse the uptime-results topic and don't need our own separate one for snuba here.

dlq_topic: snuba-dead-letter-uptime-monitor-checks

allocation_policies:
- name: ConcurrentRateLimitAllocationPolicy
args:
required_tenant_types:
- organization_id
- referrer
- project_id
default_config_overrides:
is_enforced: 0
- name: ReferrerGuardRailPolicy
args:
required_tenant_types:
- referrer
default_config_overrides:
is_enforced: 0
is_active: 0
- name: BytesScannedRejectingPolicy
args:
required_tenant_types:
- organization_id
- project_id
- referrer
default_config_overrides:
is_active: 0
is_enforced: 0
14 changes: 14 additions & 0 deletions snuba/datasets/processors/uptime_monitors_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import logging

from snuba import environment
from snuba.datasets.processors.rust_compat_processor import RustCompatProcessor
from snuba.utils.metrics.wrapper import MetricsWrapper

logger = logging.getLogger(__name__)

metrics = MetricsWrapper(environment.metrics, "uptime_monitor_checks.processor")


class UptimeMonitorChecksProcessor(RustCompatProcessor):
def __init__(self) -> None:
super().__init__("UptimeMonitorChecksProcessor")
5 changes: 4 additions & 1 deletion snuba/datasets/storages/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ def check_readiness_state_level_with_migration_group(self) -> None:
)
if storage_readiness_state > group_readiness_state:
raise IncompatibleReadinessStates(
f"The storage={storage_set_key} readiness state is greater than the corresponding migration group's readiness state."
(
f"The storage={storage_set_key} readiness state is greater than the corresponding migration group's readiness state.",
f"storage_readiness_state={storage_readiness_state}, group_readiness_state={group_readiness_state}",
)
)


Expand Down
5 changes: 5 additions & 0 deletions snuba/migrations/group_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,8 @@ def __init__(self) -> None:
class ProfileChunksLoader(DirectoryLoader):
def __init__(self) -> None:
super().__init__("snuba.snuba_migrations.profile_chunks")


class UptimeMonitorChecksLoader(DirectoryLoader):
def __init__(self) -> None:
super().__init__("snuba.snuba_migrations.uptime_monitor_checks")
Loading
Loading