Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(flags): Update errors processor to index feature flag data #6713

Merged
merged 17 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions rust_snuba/src/processors/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ type GenericContext = BTreeMap<String, ContextStringify>;

#[derive(Debug, Default, Deserialize, JsonSchema)]
struct Contexts {
#[serde(default, rename = "flags")]
features: Option<FeatureContextEnum>,
#[serde(default)]
replay: Option<ReplayContext>,
#[serde(default)]
Expand All @@ -183,6 +185,26 @@ struct TraceContext {
other: GenericContext,
}

#[derive(Debug, Deserialize, JsonSchema)]
#[serde(untagged)]
enum FeatureContextEnum {
Typed(FeatureContext),
#[allow(warnings)]
Untyped(Value),
}

#[derive(Debug, Default, Deserialize, JsonSchema)]
struct FeatureContext {
#[serde(default)]
values: Vec<FeatureContextItem>,
}

#[derive(Debug, Default, Deserialize, JsonSchema)]
struct FeatureContextItem {
key: Unicodify,
value: Unicodify,
}

#[derive(Debug, Default, Deserialize, JsonSchema)]
struct ReplayContext {
#[serde(default, skip_serializing_if = "Option::is_none")]
Expand Down Expand Up @@ -384,6 +406,10 @@ struct ErrorRow {
tags_key: Vec<String>,
#[serde(rename = "tags.value")]
tags_value: Vec<String>,
#[serde(rename = "features.key")]
features_key: Vec<String>,
#[serde(rename = "features.value")]
features_value: Vec<String>,
timestamp: u32,
title: String,
#[serde(skip_serializing_if = "Option::is_none")]
Expand Down Expand Up @@ -563,6 +589,19 @@ impl ErrorRow {
replay_id = Some(rid)
}

// Split feature keys and values into two vectors if the context could be parsed.
let mut features_key = Vec::new();
let mut features_value = Vec::new();

if let Some(FeatureContextEnum::Typed(ctx)) = from_context.features {
ctx.values.into_iter().for_each(|i| {
if let (Some(k), Some(v)) = (i.key.0, i.value.0) {
features_key.push(k);
features_value.push(v);
}
})
};

// Stacktrace.

let exceptions = from
Expand Down Expand Up @@ -668,6 +707,8 @@ impl ErrorRow {
exception_stacks_mechanism_type: stack_mechanism_types,
exception_stacks_type: stack_types,
exception_stacks_value: stack_values,
features_key,
features_value,
group_id: from.group_id,
http_method: from_request.method.0,
http_referer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@ source: src/processors/mod.rs
expression: diff
---
[
Change {
path: ".<anyOf:0>.2.data.contexts.<anyOf:0>",
change: PropertyAdd {
lhs_additional_properties: true,
added: "flags",
},
},
Change {
path: ".<anyOf:0>.2.data.contexts.<anyOf:0>.trace.<anyOf:0>",
change: PropertyAdd {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ expression: snapshot_payload
"exception_stacks.value": [
"Some exception."
],
"features.key": [],
"features.value": [],
"group_id": 124,
"http_method": null,
"http_referer": null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ expression: snapshot_payload
"exception_stacks.value": [
"Some exception."
],
"features.key": [],
"features.value": [],
"group_id": 124,
"http_method": null,
"http_referer": null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ expression: snapshot_payload
"exception_stacks.value": [
"Some exception."
],
"features.key": [],
"features.value": [],
"group_id": 124,
"http_method": null,
"http_referer": null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ expression: snapshot_payload
"exception_stacks.mechanism_type": [],
"exception_stacks.type": [],
"exception_stacks.value": [],
"features.key": [],
"features.value": [],
"group_id": 124,
"http_method": null,
"http_referer": null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ expression: snapshot_payload
"exception_stacks.mechanism_type": [],
"exception_stacks.type": [],
"exception_stacks.value": [],
"features.key": [],
"features.value": [],
"group_id": 123123123,
"http_method": null,
"http_referer": null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ expression: snapshot_payload
"exception_stacks.mechanism_type": [],
"exception_stacks.type": [],
"exception_stacks.value": [],
"features.key": [],
"features.value": [],
"group_id": 124,
"http_method": null,
"http_referer": null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ expression: snapshot_payload
"exception_stacks.mechanism_type": [],
"exception_stacks.type": [],
"exception_stacks.value": [],
"features.key": [],
"features.value": [],
"group_id": 123123,
"http_method": null,
"http_referer": null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ expression: snapshot_payload
"exception_stacks.mechanism_type": [],
"exception_stacks.type": [],
"exception_stacks.value": [],
"features.key": [],
"features.value": [],
"group_id": 123,
"http_method": null,
"http_referer": null,
Expand Down
70 changes: 70 additions & 0 deletions tests/datasets/test_errors_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class ErrorEvent:
trace_sampled: bool | None
environment: str
replay_id: uuid.UUID | None
features: list[Mapping[str, Any]] | None
received_timestamp: datetime
errors: Sequence[Mapping[str, Any]] | None

Expand Down Expand Up @@ -135,6 +136,7 @@ def serialize(self) -> tuple[int, str, Mapping[str, Any]]:
]
},
"contexts": {
"flags": {"values": self.features},
"runtime": {
"version": "3.7.6",
"type": "runtime",
Expand Down Expand Up @@ -377,8 +379,15 @@ def build_result(self, meta: KafkaMessageMetadata) -> Mapping[str, Any]:
"modules.version": ["1.13.2", "0.2.0", "0.6.0"],
"transaction_name": "",
"num_processing_errors": len(self.errors) if self.errors is not None else 0,
"features.key": [],
"features.value": [],
}

if self.features:
for feature in self.features:
expected_result["features.key"].append(feature["key"])
expected_result["features.value"].append(str(feature["value"]).lower())

if self.replay_id:
expected_result["replay_id"] = str(self.replay_id)

Expand Down Expand Up @@ -423,6 +432,7 @@ def __get_error_event(self, timestamp: datetime, recieved: datetime) -> ErrorEve
"subdivision": "fake_subdivision",
},
errors=None,
features=None,
)

def test_errors_basic(self) -> None:
Expand Down Expand Up @@ -465,6 +475,7 @@ def test_errors_replayid_context(self) -> None:
},
replay_id=uuid.uuid4(),
errors=None,
features=None,
)

payload = message.serialize()
Expand Down Expand Up @@ -503,6 +514,7 @@ def test_errors_replayid_tag(self) -> None:
},
replay_id=None,
errors=None,
features=None,
)
replay_id = uuid.uuid4()
payload = message.serialize()
Expand Down Expand Up @@ -548,6 +560,7 @@ def test_errors_replayid_tag_and_context(self) -> None:
},
replay_id=replay_id,
errors=None,
features=None,
)

payload = message.serialize()
Expand Down Expand Up @@ -591,6 +604,7 @@ def test_errors_replayid_invalid_tag(self) -> None:
},
replay_id=None,
errors=None,
features=None,
)
invalid_replay_id = "imnotavaliduuid"
payload = message.serialize()
Expand Down Expand Up @@ -649,6 +663,7 @@ def test_exception_main_thread_true(self) -> None:
]
},
errors=None,
features=None,
)
payload = message.serialize()
meta = KafkaMessageMetadata(offset=2, partition=2, timestamp=timestamp)
Expand Down Expand Up @@ -703,6 +718,7 @@ def test_exception_main_thread_false(self) -> None:
]
},
errors=None,
features=None,
)
payload = message.serialize()
meta = KafkaMessageMetadata(offset=2, partition=2, timestamp=timestamp)
Expand Down Expand Up @@ -743,6 +759,7 @@ def test_trace_sampled(self) -> None:
replay_id=None,
threads=None,
errors=None,
features=None,
)
payload = message.serialize()
meta = KafkaMessageMetadata(offset=2, partition=2, timestamp=timestamp)
Expand Down Expand Up @@ -794,6 +811,59 @@ def test_errors_processed(self) -> None:
replay_id=None,
threads=None,
errors=[{"type": "one"}, {"type": "two"}, {"type": "three"}],
features=None,
)
payload = message.serialize()
meta = KafkaMessageMetadata(offset=2, partition=2, timestamp=timestamp)

result = message.build_result(meta)
result["num_processing_errors"] = 3

assert self.processor.process_message(payload, meta) == InsertBatch(
[result], ANY
)

# ensure old behavior where data.errors=None won't set 'num_processing_errors'
message.errors = None
payload = message.serialize()
meta = KafkaMessageMetadata(offset=2, partition=2, timestamp=timestamp)

result = message.build_result(meta)

assert self.processor.process_message(payload, meta) == InsertBatch(
[result], ANY
)

def test_errors_with_flags(self) -> None:
timestamp, recieved = self.__get_timestamps()
message = ErrorEvent(
event_id=str(uuid.UUID("dcb9d002cac548c795d1c9adbfc68040")),
organization_id=1,
project_id=2,
group_id=100,
platform="python",
message="",
trace_id=str(uuid.uuid4()),
trace_sampled=False,
timestamp=timestamp,
received_timestamp=recieved,
release="1.0.0",
dist="dist",
environment="prod",
email="[email protected]",
ip_address="127.0.0.1",
user_id="myself",
username="me",
geo={
"country_code": "XY",
"region": "fake_region",
"city": "fake_city",
"subdivision": "fake_subdivision",
},
replay_id=None,
threads=None,
errors=[{"type": "one"}, {"type": "two"}, {"type": "three"}],
features=[{"key": "abc", "value": True}],
)
payload = message.serialize()
meta = KafkaMessageMetadata(offset=2, partition=2, timestamp=timestamp)
Expand Down
Loading