diff --git a/CHANGELOG.md b/CHANGELOG.md index 378c8c7deb0..3f4bcaf7dc6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ **Internal**: - Remove unknown debug image variants in errors. ([#5962](https://github.com/getsentry/relay/pull/5962)) +- Remove support for dynamic sampling reservoir rules. ([#5988](https://github.com/getsentry/relay/pull/5988)) - Bump `sentry-conventions` to 0.6.0-4. ([#5944](https://github.com/getsentry/relay/pull/5944)) - Bump `sqlparser` to 0.62. ([#5964](https://github.com/getsentry/relay/pull/5964)) - Enable compression for forwarded uploads. ([#5965](https://github.com/getsentry/relay/pull/5965)) diff --git a/Cargo.lock b/Cargo.lock index 12d06acba6e..a12e406290c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4887,7 +4887,6 @@ dependencies = [ name = "relay-sampling" version = "26.4.2" dependencies = [ - "anyhow", "chrono", "insta", "rand 0.9.4", @@ -4896,8 +4895,6 @@ dependencies = [ "relay-event-schema", "relay-log", "relay-protocol", - "relay-redis", - "relay-statsd", "serde", "serde_json", "similar-asserts", diff --git a/relay-sampling/Cargo.toml b/relay-sampling/Cargo.toml index e329f7dee6f..b382a5fb194 100644 --- a/relay-sampling/Cargo.toml +++ b/relay-sampling/Cargo.toml @@ -9,15 +9,10 @@ edition = "2024" license-file = "../LICENSE.md" publish = false -[features] -default = [] -redis = ["dep:anyhow", "relay-redis/impl", "dep:relay-statsd"] - [lints] workspace = true [dependencies] -anyhow = { workspace = true, optional = true } chrono = { workspace = true, features = ["now"] } rand = { workspace = true } rand_pcg = { workspace = true } @@ -25,8 +20,6 @@ relay-base-schema = { workspace = true } relay-event-schema = { workspace = true } relay-log = { workspace = true } relay-protocol = { workspace = true } -relay-redis = { workspace = true, optional = true } -relay-statsd = { workspace = true, optional = true } serde = { workspace = true } serde_json = { workspace = true } uuid = { workspace = true } diff --git a/relay-sampling/src/config.rs b/relay-sampling/src/config.rs index 12123efe7f3..e2d44220127 100644 --- a/relay-sampling/src/config.rs +++ b/relay-sampling/src/config.rs @@ -154,15 +154,6 @@ pub enum SamplingValue { value: f64, }, - /// A reservoir limit. - /// - /// A rule with a reservoir limit will be sampled if the rule have been matched fewer times - /// than the limit. - Reservoir { - /// The limit of how many times this rule will be sampled before this rule is invalid. - limit: i64, - }, - /// A minimum sample rate. /// /// The sample rate specified in the rule will be used as a minimum over the otherwise used diff --git a/relay-sampling/src/evaluation.rs b/relay-sampling/src/evaluation.rs index 155f25f1dc1..306a949d5f4 100644 --- a/relay-sampling/src/evaluation.rs +++ b/relay-sampling/src/evaluation.rs @@ -1,26 +1,18 @@ //! Evaluation of dynamic sampling rules. -use std::collections::BTreeMap; use std::fmt; use std::num::ParseIntError; use std::ops::ControlFlow; -use std::sync::{Arc, Mutex}; use chrono::{DateTime, Utc}; use rand::Rng; use rand::distr::StandardUniform; use rand_pcg::Pcg32; -#[cfg(feature = "redis")] -use relay_base_schema::organization::OrganizationId; use relay_protocol::Getter; -#[cfg(feature = "redis")] -use relay_redis::AsyncRedisClient; use serde::Serialize; use uuid::Uuid; use crate::config::{RuleId, SamplingRule, SamplingValue}; -#[cfg(feature = "redis")] -use crate::redis_sampling::{self, ReservoirRuleKey}; /// Generates a pseudo random number by seeding the generator with the given id. /// @@ -31,161 +23,23 @@ fn pseudo_random_from_seed(seed: Uuid) -> f64 { generator.sample(StandardUniform) } -/// The amount of matches for each reservoir rule in a given project. -pub type ReservoirCounters = Arc>>; - -/// Utility for evaluating reservoir-based sampling rules. -/// -/// A "reservoir limit" rule samples every match until its limit is reached, after which -/// the rule is disabled. -/// -/// This utility uses a dual-counter system for enforcing this limit: -/// -/// - Local Counter: Each relay instance maintains a local counter to track sampled events. -/// -/// - Redis Counter: For processing relays, a Redis-based counter provides synchronization -/// across multiple relay-instances. When incremented, the Redis counter returns the current global -/// count for the given rule, which is then used to update the local counter. -#[derive(Debug)] -pub struct ReservoirEvaluator<'a> { - counters: ReservoirCounters, - #[cfg(feature = "redis")] - org_id_and_client: Option<(OrganizationId, &'a AsyncRedisClient)>, - // Using PhantomData because the lifetimes are behind a feature flag. - _phantom: std::marker::PhantomData<&'a ()>, -} - -impl ReservoirEvaluator<'_> { - /// Constructor for [`ReservoirEvaluator`]. - pub fn new(counters: ReservoirCounters) -> Self { - Self { - counters, - #[cfg(feature = "redis")] - org_id_and_client: None, - _phantom: std::marker::PhantomData, - } - } - - /// Gets shared ownership of the reservoir counters. - pub fn counters(&self) -> ReservoirCounters { - Arc::clone(&self.counters) - } - - #[cfg(feature = "redis")] - async fn redis_incr( - &self, - key: &ReservoirRuleKey, - client: &AsyncRedisClient, - rule_expiry: Option<&DateTime>, - ) -> relay_redis::Result { - relay_statsd::metric!(timer(crate::statsd::SamplingTimers::RedisReservoir), { - let mut connection = client.get_connection().await?; - - let val = redis_sampling::increment_redis_reservoir_count(&mut connection, key).await?; - redis_sampling::set_redis_expiry(&mut connection, key, rule_expiry).await?; - - Ok(val) - }) - } - - /// Evaluates a reservoir rule, returning `true` if it should be sampled. - pub fn incr_local(&self, rule: RuleId, limit: i64) -> bool { - let Ok(mut map_guard) = self.counters.lock() else { - relay_log::error!("failed to lock reservoir counter mutex"); - return false; - }; - - let counter_value = map_guard.entry(rule).or_insert(0); - - if *counter_value < limit { - *counter_value += 1; - true - } else { - false - } - } - - /// Evaluates a reservoir rule, returning `true` if it should be sampled. - pub async fn evaluate( - &self, - rule: RuleId, - limit: i64, - _rule_expiry: Option<&DateTime>, - ) -> bool { - #[cfg(feature = "redis")] - if let Some((org_id, client)) = self.org_id_and_client { - if let Ok(guard) = self.counters.lock() - && *guard.get(&rule).unwrap_or(&0) > limit - { - return false; - } - - let key = ReservoirRuleKey::new(org_id, rule); - let redis_count = match self.redis_incr(&key, client, _rule_expiry).await { - Ok(redis_count) => redis_count, - Err(e) => { - relay_log::error!( - error = &e as &dyn std::error::Error, - "failed to increment reservoir rule" - ); - return false; - } - }; - - if let Ok(mut map_guard) = self.counters.lock() { - // If the rule isn't present, it has just been cleaned up by a project state update. - // In that case, it is no longer relevant so we ignore it. - if let Some(value) = map_guard.get_mut(&rule) { - *value = redis_count.max(*value); - } - } - return redis_count <= limit; - } - - self.incr_local(rule, limit) - } -} - -#[cfg(feature = "redis")] -impl<'a> ReservoirEvaluator<'a> { - /// Sets the Redis pool and organization ID for the [`ReservoirEvaluator`]. - /// - /// These values are needed to synchronize with Redis. - pub fn set_redis(&mut self, org_id: OrganizationId, client: &'a AsyncRedisClient) { - self.org_id_and_client = Some((org_id, client)); - } -} - /// State machine for dynamic sampling. #[derive(Debug)] -pub struct SamplingEvaluator<'a> { +pub struct SamplingEvaluator { now: DateTime, rule_ids: Vec, factor: f64, minimum_sample_rate: Option, - reservoir: Option<&'a ReservoirEvaluator<'a>>, } -impl<'a> SamplingEvaluator<'a> { - /// Constructs an evaluator with reservoir sampling. - pub fn new_with_reservoir(now: DateTime, reservoir: &'a ReservoirEvaluator<'a>) -> Self { - Self { - now, - rule_ids: vec![], - factor: 1.0, - minimum_sample_rate: None, - reservoir: Some(reservoir), - } - } - - /// Constructs an evaluator without reservoir sampling. +impl SamplingEvaluator { + /// Constructs an evaluator. pub fn new(now: DateTime) -> Self { Self { now, rule_ids: vec![], factor: 1.0, minimum_sample_rate: None, - reservoir: None, } } @@ -200,7 +54,7 @@ impl<'a> SamplingEvaluator<'a> { /// - If this value is returned and there are no more rules to evaluate, it should be interpreted as "no match." /// /// - `ControlFlow::Break`: Indicates that one or more rules have successfully matched. - pub async fn match_rules<'b, I, G>( + pub fn match_rules<'a, I, G>( mut self, seed: Uuid, instance: &G, @@ -208,14 +62,14 @@ impl<'a> SamplingEvaluator<'a> { ) -> ControlFlow where G: Getter, - I: Iterator, + I: Iterator, { for rule in rules { if !rule.time_range.contains(self.now) || !rule.condition.matches(instance) { continue; }; - if let Some(sample_rate) = self.try_compute_sample_rate(rule).await { + if let Some(sample_rate) = self.try_compute_sample_rate(rule) { return ControlFlow::Break(SamplingMatch::new(sample_rate, seed, self.rule_ids)); }; } @@ -230,7 +84,7 @@ impl<'a> SamplingEvaluator<'a> { /// - `None` if the sampling rule is invalid, expired, or if the final sample rate has not been /// determined yet. /// - `Some` if the computed sample rate should be applied directly. - async fn try_compute_sample_rate(&mut self, rule: &SamplingRule) -> Option { + fn try_compute_sample_rate(&mut self, rule: &SamplingRule) -> Option { match rule.sampling_value { SamplingValue::Factor { value } => { self.factor *= rule.apply_decaying_fn(value, self.now)?; @@ -245,21 +99,6 @@ impl<'a> SamplingEvaluator<'a> { self.rule_ids.push(rule.id); Some(adjusted) } - SamplingValue::Reservoir { limit } => { - let reservoir = self.reservoir?; - if !reservoir - .evaluate(rule.id, limit, rule.time_range.end.as_ref()) - .await - { - return None; - } - - // Clearing the previously matched rules because reservoir overrides them. - self.rule_ids.clear(); - self.rule_ids.push(rule.id); - // If the reservoir has not yet reached its limit, we want to sample 100%. - Some(1.0) - } SamplingValue::MinimumSampleRate { value } => { if self.minimum_sample_rate.is_none() { self.minimum_sample_rate = Some(rule.apply_decaying_fn(value, self.now)?); @@ -430,24 +269,13 @@ mod tests { use super::*; - fn mock_reservoir_evaluator(vals: Vec<(u32, i64)>) -> ReservoirEvaluator<'static> { - let mut map = BTreeMap::default(); - - for (rule_id, count) in vals { - map.insert(RuleId(rule_id), count); - } - - let map = Arc::new(Mutex::new(map)); - - ReservoirEvaluator::new(map) - } - /// Helper to extract the sampling match after evaluating rules. - async fn get_sampling_match(rules: &[SamplingRule], instance: &impl Getter) -> SamplingMatch { - match SamplingEvaluator::new(Utc::now()) - .match_rules(Uuid::default(), instance, rules.iter()) - .await - { + fn get_sampling_match(rules: &[SamplingRule], instance: &impl Getter) -> SamplingMatch { + match SamplingEvaluator::new(Utc::now()).match_rules( + Uuid::default(), + instance, + rules.iter(), + ) { ControlFlow::Break(sampling_match) => sampling_match, ControlFlow::Continue(_) => panic!("no match found"), } @@ -458,26 +286,12 @@ mod tests { } /// Helper to check if certain rules are matched on. - async fn matches_rule_ids( - rule_ids: &[u32], - rules: &[SamplingRule], - instance: &impl Getter, - ) -> bool { + fn matches_rule_ids(rule_ids: &[u32], rules: &[SamplingRule], instance: &impl Getter) -> bool { let matched_rule_ids = MatchedRuleIds(rule_ids.iter().map(|num| RuleId(*num)).collect()); - let sampling_match = get_sampling_match(rules, instance).await; + let sampling_match = get_sampling_match(rules, instance); matched_rule_ids == sampling_match.matched_rules } - // Helper method to "unwrap" the sampling match. - fn get_matched_rules( - sampling_evaluator: &ControlFlow, - ) -> Vec { - match sampling_evaluator { - ControlFlow::Continue(_) => panic!("expected a sampling match"), - ControlFlow::Break(m) => m.matched_rules.0.iter().map(|rule_id| rule_id.0).collect(), - } - } - /// Helper function to create a dsc with the provided getter-values set. fn mocked_dsc_with_getter_values( paths_and_values: Vec<(&str, &str)>, @@ -510,34 +324,14 @@ mod tests { dsc } - async fn is_match( - now: DateTime, - rule: &SamplingRule, - dsc: &DynamicSamplingContext, - ) -> bool { + fn is_match(now: DateTime, rule: &SamplingRule, dsc: &DynamicSamplingContext) -> bool { SamplingEvaluator::new(now) .match_rules(Uuid::default(), dsc, std::iter::once(rule)) - .await .is_break() } - #[tokio::test] - async fn test_reservoir_evaluator_limit() { - let evaluator = mock_reservoir_evaluator(vec![(1, 0)]); - - let rule = RuleId(1); - let limit = 3; - - assert!(evaluator.evaluate(rule, limit, None).await); - assert!(evaluator.evaluate(rule, limit, None).await); - assert!(evaluator.evaluate(rule, limit, None).await); - // After 3 samples we have reached the limit, and the following rules are not sampled. - assert!(!evaluator.evaluate(rule, limit, None).await); - assert!(!evaluator.evaluate(rule, limit, None).await); - } - - #[tokio::test] - async fn test_sample_rate_compounding() { + #[test] + fn test_sample_rate_compounding() { let rules = simple_sampling_rules(vec![ (RuleCondition::all(), SamplingValue::Factor { value: 0.8 }), (RuleCondition::all(), SamplingValue::Factor { value: 0.5 }), @@ -549,11 +343,11 @@ mod tests { let dsc = mocked_dsc_with_getter_values(vec![]); // 0.8 * 0.5 * 0.25 == 0.1 - assert_eq!(get_sampling_match(&rules, &dsc).await.sample_rate(), 0.1); + assert_eq!(get_sampling_match(&rules, &dsc).sample_rate(), 0.1); } - #[tokio::test] - async fn test_minimum_sample_rate() { + #[test] + fn test_minimum_sample_rate() { let rules = simple_sampling_rules(vec![ (RuleCondition::all(), SamplingValue::Factor { value: 1.5 }), ( @@ -573,7 +367,7 @@ mod tests { let dsc = mocked_dsc_with_getter_values(vec![]); // max(0.05, 0.5) * 1.5 = 0.75 - assert_eq!(get_sampling_match(&rules, &dsc).await.sample_rate(), 0.75); + assert_eq!(get_sampling_match(&rules, &dsc).sample_rate(), 0.75); } fn mocked_sampling_rule() -> SamplingRule { @@ -606,62 +400,9 @@ mod tests { vec } - /// Tests that reservoir rules override the other rules. - /// - /// Here all 3 rules are a match. But when the reservoir - /// rule (id = 1) has not yet reached its limit of "2" matches, the - /// previous rule(s) will not be present in the matched rules output. - /// After the limit has been reached, the reservoir rule is ignored - /// and the output is the two other rules (id = 0, id = 2). - #[tokio::test] - async fn test_reservoir_override() { - let dsc = mocked_dsc_with_getter_values(vec![]); - let rules = simple_sampling_rules(vec![ - (RuleCondition::all(), SamplingValue::Factor { value: 0.5 }), - // The reservoir has a limit of 2, meaning it should be sampled twice - // before it is ignored. - (RuleCondition::all(), SamplingValue::Reservoir { limit: 2 }), - ( - RuleCondition::all(), - SamplingValue::SampleRate { value: 0.5 }, - ), - ]); - - // The reservoir keeps the counter state behind a mutex, which is how it - // shares state among multiple evaluator instances. - let reservoir = mock_reservoir_evaluator(vec![]); - - let evaluator = SamplingEvaluator::new_with_reservoir(Utc::now(), &reservoir); - let matched_rules = get_matched_rules( - &evaluator - .match_rules(Uuid::default(), &dsc, rules.iter()) - .await, - ); - // Reservoir rule overrides 0 and 2. - assert_eq!(&matched_rules, &[1]); - - let evaluator = SamplingEvaluator::new_with_reservoir(Utc::now(), &reservoir); - let matched_rules = get_matched_rules( - &evaluator - .match_rules(Uuid::default(), &dsc, rules.iter()) - .await, - ); - // Reservoir rule overrides 0 and 2. - assert_eq!(&matched_rules, &[1]); - - let evaluator = SamplingEvaluator::new_with_reservoir(Utc::now(), &reservoir); - let matched_rules = get_matched_rules( - &evaluator - .match_rules(Uuid::default(), &dsc, rules.iter()) - .await, - ); - // Reservoir rule reached its limit, rule 0 and 2 are now matched instead. - assert_eq!(&matched_rules, &[0, 2]); - } - /// Checks that rules don't match if the time is outside the time range. - #[tokio::test] - async fn test_expired_rules() { + #[test] + fn test_expired_rules() { let rule = SamplingRule { condition: RuleCondition::all(), sampling_value: SamplingValue::SampleRate { value: 1.0 }, @@ -678,27 +419,33 @@ mod tests { // Baseline test. let within_timerange = Utc.with_ymd_and_hms(1970, 10, 11, 0, 0, 0).unwrap(); - let res = SamplingEvaluator::new(within_timerange) - .match_rules(Uuid::default(), &dsc, [rule.clone()].iter()) - .await; + let res = SamplingEvaluator::new(within_timerange).match_rules( + Uuid::default(), + &dsc, + [rule.clone()].iter(), + ); assert!(evaluation_is_match(res)); let before_timerange = Utc.with_ymd_and_hms(1969, 1, 1, 0, 0, 0).unwrap(); - let res = SamplingEvaluator::new(before_timerange) - .match_rules(Uuid::default(), &dsc, [rule.clone()].iter()) - .await; + let res = SamplingEvaluator::new(before_timerange).match_rules( + Uuid::default(), + &dsc, + [rule.clone()].iter(), + ); assert!(!evaluation_is_match(res)); let after_timerange = Utc.with_ymd_and_hms(1971, 1, 1, 0, 0, 0).unwrap(); - let res = SamplingEvaluator::new(after_timerange) - .match_rules(Uuid::default(), &dsc, [rule].iter()) - .await; + let res = SamplingEvaluator::new(after_timerange).match_rules( + Uuid::default(), + &dsc, + [rule].iter(), + ); assert!(!evaluation_is_match(res)); } /// Checks that `SamplingValueEvaluator` correctly matches the right rules. - #[tokio::test] - async fn test_condition_matching() { + #[test] + fn test_condition_matching() { let rules = simple_sampling_rules(vec![ ( RuleCondition::glob("trace.transaction", "*healthcheck*"), @@ -730,15 +477,15 @@ mod tests { // early return of first rule let dsc = mocked_dsc_with_getter_values(vec![("trace.transaction", "foohealthcheckbar")]); - assert!(matches_rule_ids(&[0], &rules, &dsc).await); + assert!(matches_rule_ids(&[0], &rules, &dsc)); // early return of second rule let dsc = mocked_dsc_with_getter_values(vec![("trace.environment", "dev")]); - assert!(matches_rule_ids(&[1], &rules, &dsc).await); + assert!(matches_rule_ids(&[1], &rules, &dsc)); // factor match third rule and early return sixth rule let dsc = mocked_dsc_with_getter_values(vec![("trace.transaction", "raboof")]); - assert!(matches_rule_ids(&[2, 5], &rules, &dsc).await); + assert!(matches_rule_ids(&[2, 5], &rules, &dsc)); // factor match third rule and early return fourth rule let dsc = mocked_dsc_with_getter_values(vec![ @@ -746,7 +493,7 @@ mod tests { ("trace.release", "1.1.1"), ("trace.user.segment", "vip"), ]); - assert!(matches_rule_ids(&[2, 3], &rules, &dsc).await); + assert!(matches_rule_ids(&[2, 3], &rules, &dsc)); // factor match third, fifth rule and early return sixth rule let dsc = mocked_dsc_with_getter_values(vec![ @@ -754,14 +501,14 @@ mod tests { ("trace.release", "1.1.1"), ("trace.environment", "prod"), ]); - assert!(matches_rule_ids(&[2, 4, 5], &rules, &dsc).await); + assert!(matches_rule_ids(&[2, 4, 5], &rules, &dsc)); // factor match fifth and early return sixth rule let dsc = mocked_dsc_with_getter_values(vec![ ("trace.release", "1.1.1"), ("trace.environment", "prod"), ]); - assert!(matches_rule_ids(&[4, 5], &rules, &dsc).await); + assert!(matches_rule_ids(&[4, 5], &rules, &dsc)); } #[test] @@ -807,14 +554,12 @@ mod tests { assert!(MatchedRuleIds::parse("a,b").is_err()); } - #[tokio::test] + #[test] /// Tests that no match is done when there are no matching rules. - async fn test_get_sampling_match_result_with_no_match() { + fn test_get_sampling_match_result_with_no_match() { let dsc = mocked_dsc_with_getter_values(vec![]); - let res = SamplingEvaluator::new(Utc::now()) - .match_rules(Uuid::default(), &dsc, [].iter()) - .await; + let res = SamplingEvaluator::new(Utc::now()).match_rules(Uuid::default(), &dsc, [].iter()); assert!(!evaluation_is_match(res)); } @@ -823,8 +568,8 @@ mod tests { /// time is out of bounds of the time range. /// When the `start` or `end` of the range is missing, it defaults to always include /// times before the `end` or after the `start`, respectively. - #[tokio::test] - async fn test_sample_rate_valid_time_range() { + #[test] + fn test_sample_rate_valid_time_range() { let dsc = mocked_dsc_with_getter_values(vec![]); let time_range = TimeRange { start: Some(Utc.with_ymd_and_hms(1970, 1, 1, 0, 0, 0).unwrap()), @@ -845,47 +590,45 @@ mod tests { }; // [start..end] - assert!(!is_match(before_time_range, &rule, &dsc).await); - assert!(is_match(during_time_range, &rule, &dsc).await); - assert!(!is_match(after_time_range, &rule, &dsc).await); + assert!(!is_match(before_time_range, &rule, &dsc)); + assert!(is_match(during_time_range, &rule, &dsc)); + assert!(!is_match(after_time_range, &rule, &dsc)); // [start..] let mut rule_without_end = rule.clone(); rule_without_end.time_range.end = None; - assert!(!is_match(before_time_range, &rule_without_end, &dsc).await); - assert!(is_match(during_time_range, &rule_without_end, &dsc).await); - assert!(is_match(after_time_range, &rule_without_end, &dsc).await); + assert!(!is_match(before_time_range, &rule_without_end, &dsc)); + assert!(is_match(during_time_range, &rule_without_end, &dsc)); + assert!(is_match(after_time_range, &rule_without_end, &dsc)); // [..end] let mut rule_without_start = rule.clone(); rule_without_start.time_range.start = None; - assert!(is_match(before_time_range, &rule_without_start, &dsc).await); - assert!(is_match(during_time_range, &rule_without_start, &dsc).await); - assert!(!is_match(after_time_range, &rule_without_start, &dsc).await); + assert!(is_match(before_time_range, &rule_without_start, &dsc)); + assert!(is_match(during_time_range, &rule_without_start, &dsc)); + assert!(!is_match(after_time_range, &rule_without_start, &dsc)); // [..] let mut rule_without_range = rule.clone(); rule_without_range.time_range = TimeRange::default(); - assert!(is_match(before_time_range, &rule_without_range, &dsc).await); - assert!(is_match(during_time_range, &rule_without_range, &dsc).await); - assert!(is_match(after_time_range, &rule_without_range, &dsc).await); + assert!(is_match(before_time_range, &rule_without_range, &dsc)); + assert!(is_match(during_time_range, &rule_without_range, &dsc)); + assert!(is_match(after_time_range, &rule_without_range, &dsc)); } /// Checks that `validate_match` yields the correct controlflow given the SamplingValue variant. - #[tokio::test] - async fn test_validate_match() { + #[test] + fn test_validate_match() { let mut rule = mocked_sampling_rule(); - - let reservoir = ReservoirEvaluator::new(ReservoirCounters::default()); - let mut eval = SamplingEvaluator::new_with_reservoir(Utc::now(), &reservoir); + let mut eval = SamplingEvaluator::new(Utc::now()); rule.sampling_value = SamplingValue::SampleRate { value: 1.0 }; - assert_eq!(eval.try_compute_sample_rate(&rule).await, Some(1.0)); + assert_eq!(eval.try_compute_sample_rate(&rule), Some(1.0)); rule.sampling_value = SamplingValue::Factor { value: 1.0 }; - assert_eq!(eval.try_compute_sample_rate(&rule).await, None); + assert_eq!(eval.try_compute_sample_rate(&rule), None); - rule.sampling_value = SamplingValue::Reservoir { limit: 1 }; - assert_eq!(eval.try_compute_sample_rate(&rule).await, Some(1.0)); + rule.sampling_value = SamplingValue::MinimumSampleRate { value: 1.0 }; + assert_eq!(eval.try_compute_sample_rate(&rule), None); } } diff --git a/relay-sampling/src/lib.rs b/relay-sampling/src/lib.rs index b633ffb0466..97125a201b6 100644 --- a/relay-sampling/src/lib.rs +++ b/relay-sampling/src/lib.rs @@ -76,10 +76,6 @@ pub mod config; pub mod dsc; pub mod evaluation; -#[cfg(feature = "redis")] -mod redis_sampling; -#[cfg(feature = "redis")] -mod statsd; pub use config::SamplingConfig; pub use dsc::DynamicSamplingContext; diff --git a/relay-sampling/src/redis_sampling.rs b/relay-sampling/src/redis_sampling.rs deleted file mode 100644 index 401a17fcddf..00000000000 --- a/relay-sampling/src/redis_sampling.rs +++ /dev/null @@ -1,53 +0,0 @@ -use chrono::{DateTime, Utc}; -use relay_base_schema::organization::OrganizationId; -use relay_redis::AsyncRedisConnection; - -use crate::config::RuleId; - -pub struct ReservoirRuleKey(String); - -impl ReservoirRuleKey { - pub fn new(org_id: OrganizationId, rule_id: RuleId) -> Self { - Self(format!("reservoir:{org_id}:{rule_id}")) - } - - fn as_str(&self) -> &str { - self.0.as_str() - } -} - -/// Increments the reservoir count for a given rule in redis. -/// -/// - INCR docs: . -/// - If the counter doesn't exist in redis, a new one will be inserted. -pub async fn increment_redis_reservoir_count( - connection: &mut AsyncRedisConnection, - key: &ReservoirRuleKey, -) -> relay_redis::Result { - let val = relay_redis::redis::cmd("INCR") - .arg(key.as_str()) - .query_async(connection) - .await?; - - Ok(val) -} - -/// Sets the expiry time for a reservoir rule count. -pub async fn set_redis_expiry( - connection: &mut AsyncRedisConnection, - key: &ReservoirRuleKey, - rule_expiry: Option<&DateTime>, -) -> relay_redis::Result<()> { - let now = Utc::now().timestamp(); - let expiry_time = rule_expiry - .map(|rule_expiry| rule_expiry.timestamp() + 60) - .unwrap_or_else(|| now + 86400); - - relay_redis::redis::cmd("EXPIRE") - .arg(key.as_str()) - .arg(expiry_time - now) - .query_async::<()>(connection) - .await?; - - Ok(()) -} diff --git a/relay-sampling/src/statsd.rs b/relay-sampling/src/statsd.rs deleted file mode 100644 index cde2bae5179..00000000000 --- a/relay-sampling/src/statsd.rs +++ /dev/null @@ -1,14 +0,0 @@ -use relay_statsd::TimerMetric; - -pub enum SamplingTimers { - /// Amount of time it took to increment the Redis reservoir. - RedisReservoir, -} - -impl TimerMetric for SamplingTimers { - fn name(&self) -> &'static str { - match self { - Self::RedisReservoir => "sampling.redis.reservoir", - } - } -} diff --git a/relay-server/Cargo.toml b/relay-server/Cargo.toml index 54b1e19f6b6..08e548613ca 100644 --- a/relay-server/Cargo.toml +++ b/relay-server/Cargo.toml @@ -22,7 +22,6 @@ processing = [ "relay-metrics/redis", "relay-quotas/redis", "relay-redis/impl", - "relay-sampling/redis", ] [lints] diff --git a/relay-server/src/processing/errors/dynamic_sampling.rs b/relay-server/src/processing/errors/dynamic_sampling.rs index c72984d8fa2..9e30fb6b0b6 100644 --- a/relay-server/src/processing/errors/dynamic_sampling.rs +++ b/relay-server/src/processing/errors/dynamic_sampling.rs @@ -13,7 +13,7 @@ use crate::utils::SamplingResult; /// /// The function validates the DSC as well as a tagging the error event with the sampling decision /// of the associated trace. -pub async fn apply(error: &mut Managed, ctx: Context<'_>) { +pub fn apply(error: &mut Managed, ctx: Context<'_>) { // Only run in processing to not compute the decision multiple times and it is the most // accurate place, as other Relays may have unsupported inbound filter or sampling configs. if !ctx.is_processing() { @@ -27,7 +27,7 @@ pub async fn apply(error: &mut Managed, ctx: Context<'_>) { return; } - if let Some(sampled) = is_trace_fully_sampled(error, ctx).await { + if let Some(sampled) = is_trace_fully_sampled(error, ctx) { error.modify(|error, _| tag_error_with_sampling_decision(error, sampled)); }; } @@ -55,7 +55,7 @@ fn tag_error_with_sampling_decision(error: &mut ExpandedError, sampled: bool) { /// Runs dynamic sampling if the dsc and root project state are not None and returns whether the /// transactions received with such dsc and project state would be kept or dropped by dynamic /// sampling. -async fn is_trace_fully_sampled(error: &ExpandedError, ctx: Context<'_>) -> Option { +fn is_trace_fully_sampled(error: &ExpandedError, ctx: Context<'_>) -> Option { let dsc = error.headers.dsc()?; let sampling_config = ctx @@ -83,6 +83,6 @@ async fn is_trace_fully_sampled(error: &ExpandedError, ctx: Context<'_>) -> Opti let rules = sampling_config.filter_rules(RuleType::Trace); - let evaluation = evaluator.match_rules(*dsc.trace_id, dsc, rules).await; + let evaluation = evaluator.match_rules(*dsc.trace_id, dsc, rules); Some(SamplingResult::from(evaluation).decision().is_keep()) } diff --git a/relay-server/src/processing/errors/mod.rs b/relay-server/src/processing/errors/mod.rs index 370d02afb75..6a4a6e249bc 100644 --- a/relay-server/src/processing/errors/mod.rs +++ b/relay-server/src/processing/errors/mod.rs @@ -119,7 +119,7 @@ impl processing::Processor for ErrorsProcessor { filter::filter(&error, ctx).reject(&error)?; - dynamic_sampling::apply(&mut error, ctx).await; + dynamic_sampling::apply(&mut error, ctx); let mut error = self.limiter.enforce_quotas(error, ctx).await?; diff --git a/relay-server/src/processing/legacy_spans/dynamic_sampling.rs b/relay-server/src/processing/legacy_spans/dynamic_sampling.rs index c1a856f9879..397f1a9b4fc 100644 --- a/relay-server/src/processing/legacy_spans/dynamic_sampling.rs +++ b/relay-server/src/processing/legacy_spans/dynamic_sampling.rs @@ -10,7 +10,7 @@ use crate::services::processor::ProcessingExtractedMetrics; use crate::statsd::RelayCounters; use crate::utils::SamplingResult; -pub async fn run( +pub fn run( spans: Managed, ctx: Context<'_>, ) -> ( @@ -23,7 +23,7 @@ pub async fn run( true => { // We only implement trace-based sampling rules for now, which can be computed // once for all spans in the envelope. - processing::utils::dynamic_sampling::run(spans.headers.dsc(), None, &ctx, None).await + processing::utils::dynamic_sampling::run(spans.headers.dsc(), None, &ctx) } false => SamplingResult::Pending, }; diff --git a/relay-server/src/processing/legacy_spans/mod.rs b/relay-server/src/processing/legacy_spans/mod.rs index e6b429bdc09..5bc76770f75 100644 --- a/relay-server/src/processing/legacy_spans/mod.rs +++ b/relay-server/src/processing/legacy_spans/mod.rs @@ -120,7 +120,7 @@ impl processing::Processor for LegacySpansProcessor { Either::Right(metrics) => return Ok(Output::metrics(metrics)), }; - let (mut spans, metrics) = match dynamic_sampling::run(spans, ctx).await { + let (mut spans, metrics) = match dynamic_sampling::run(spans, ctx) { (Some(spans), metrics) => (spans, metrics), (None, metrics) => return Ok(Output::metrics(metrics)), }; diff --git a/relay-server/src/processing/mod.rs b/relay-server/src/processing/mod.rs index 9fef0005e0f..7d3e63a14da 100644 --- a/relay-server/src/processing/mod.rs +++ b/relay-server/src/processing/mod.rs @@ -9,7 +9,6 @@ use relay_config::{Config, RelayMode}; use relay_dynamic_config::GlobalConfig; use relay_quotas::RateLimits; -use relay_sampling::evaluation::ReservoirCounters; use crate::managed::{Counted, Managed, ManagedEnvelope, Rejected}; use crate::metrics_extraction::ExtractedMetrics; @@ -87,11 +86,6 @@ pub struct Context<'a> { /// /// The caller needs to ensure the rate limits are not yet expired. pub rate_limits: &'a RateLimits, - - /// Counters used for getting more samples for a project on-demand. - /// - /// Reservoir counters are a legacy feature and will be removed in the near future. - pub reservoir_counters: &'a ReservoirCounters, } impl<'a> Context<'a> { @@ -134,7 +128,6 @@ impl Context<'static> { static GLOBAL_CONFIG: LazyLock = LazyLock::new(Default::default); static PROJECT_INFO: LazyLock = LazyLock::new(Default::default); static RATE_LIMITS: LazyLock = LazyLock::new(Default::default); - static RESERVOIR_COUNTERS: LazyLock = LazyLock::new(Default::default); Self { config: &CONFIG, @@ -142,7 +135,6 @@ impl Context<'static> { project_info: &PROJECT_INFO, sampling_project_info: None, rate_limits: &RATE_LIMITS, - reservoir_counters: &RESERVOIR_COUNTERS, } } } diff --git a/relay-server/src/processing/spans/dynamic_sampling.rs b/relay-server/src/processing/spans/dynamic_sampling.rs index c84fceb6327..488380fa2eb 100644 --- a/relay-server/src/processing/spans/dynamic_sampling.rs +++ b/relay-server/src/processing/spans/dynamic_sampling.rs @@ -106,11 +106,11 @@ pub fn validate_dsc(spans: &ExpandedSpans) -> Result<()> { /// /// All spans are evaluated in one go as they are required by the protocol to share the same /// DSC, which contains all the sampling relevant information. -pub async fn run( +pub fn run( mut spans: Managed, ctx: Context<'_>, ) -> Result, Managed> { - let sampling_result = compute(&spans, ctx).await; + let sampling_result = compute(&spans, ctx); relay_statsd::metric!( counter(RelayCounters::SamplingDecision) += 1, @@ -188,7 +188,7 @@ fn split_indexed_and_total( }) } -async fn compute(spans: &Managed, ctx: Context<'_>) -> SamplingResult { +fn compute(spans: &Managed, ctx: Context<'_>) -> SamplingResult { // The DSC is always required, we need it to evaluate all rules, if it is missing, // no rules can be applied -> we sample the item. let Some(dsc) = spans.headers.dsc() else { @@ -209,7 +209,6 @@ async fn compute(spans: &Managed, ctx: Context<'_>) -> SamplingRe return SamplingResult::NoMatch; }; - // Currently there is no support planned for reservoir sampling. let mut evaluator = SamplingEvaluator::new(Utc::now()); // Apply project rules before trace rules, to give projects a chance to override the trace root @@ -222,17 +221,14 @@ async fn compute(spans: &Managed, ctx: Context<'_>) -> SamplingRe // // The trace id gives us this property and it will also have the upside of consistently // sampling multiple segments of the same trace. - evaluator = match evaluator.match_rules(*dsc.trace_id, dsc, rules).await { + evaluator = match evaluator.match_rules(*dsc.trace_id, dsc, rules) { ControlFlow::Continue(evaluator) => evaluator, ControlFlow::Break(sampling_match) => return SamplingResult::Match(sampling_match), } } let rules = root_sampling_config.filter_rules(RuleType::Trace); - evaluator - .match_rules(*dsc.trace_id, dsc, rules) - .await - .into() + evaluator.match_rules(*dsc.trace_id, dsc, rules).into() } fn get_sampling_config(info: &ProjectInfo) -> Option<&SamplingConfig> { diff --git a/relay-server/src/processing/spans/mod.rs b/relay-server/src/processing/spans/mod.rs index 487defb3194..a693e5f08c8 100644 --- a/relay-server/src/processing/spans/mod.rs +++ b/relay-server/src/processing/spans/mod.rs @@ -182,7 +182,7 @@ impl processing::Processor for SpansProcessor { let spans = process::expand(spans)?; - let mut spans = match dynamic_sampling::run(spans, ctx).await { + let mut spans = match dynamic_sampling::run(spans, ctx) { Ok(spans) => spans, Err(metrics) => return Ok(Output::metrics(metrics)), }; diff --git a/relay-server/src/processing/trace_attachments/mod.rs b/relay-server/src/processing/trace_attachments/mod.rs index b9f806462f8..e2a8ddc315e 100644 --- a/relay-server/src/processing/trace_attachments/mod.rs +++ b/relay-server/src/processing/trace_attachments/mod.rs @@ -117,7 +117,7 @@ impl Processor for TraceAttachmentsProcessor { ) -> Result, Rejected> { let work = filter::feature_flag(work, ctx)?; - let work = process::sample(work, ctx).await?; + let work = process::sample(work, ctx)?; let work = process::expand(work); diff --git a/relay-server/src/processing/trace_attachments/process.rs b/relay-server/src/processing/trace_attachments/process.rs index 0aca2a7fa18..d5ecbcb2c63 100644 --- a/relay-server/src/processing/trace_attachments/process.rs +++ b/relay-server/src/processing/trace_attachments/process.rs @@ -72,14 +72,13 @@ pub fn parse_and_validate(item: &Item) -> Result, ctx: Context<'_>, ) -> Result, Rejected> { let event = None; // only apply trace-based rules. - let reservoir = None; // legacy - let result = dynamic_sampling::run(work.headers.dsc(), event, &ctx, reservoir).await; + let result = dynamic_sampling::run(work.headers.dsc(), event, &ctx); let server_sample_rate = result.sample_rate(); work.try_map(|work, _| { diff --git a/relay-server/src/processing/transactions/mod.rs b/relay-server/src/processing/transactions/mod.rs index ba1b1e4feaf..07bfc92018c 100644 --- a/relay-server/src/processing/transactions/mod.rs +++ b/relay-server/src/processing/transactions/mod.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use relay_event_normalization::GeoIpLookup; use relay_event_schema::protocol::Metrics; use relay_quotas::RateLimits; -use relay_redis::AsyncRedisClient; use relay_sampling::evaluation::SamplingDecision; use crate::envelope::ItemType; @@ -71,20 +70,14 @@ impl OutcomeError for Error { pub struct TransactionProcessor { limiter: Arc, geoip_lookup: GeoIpLookup, - quotas_client: Option, } impl TransactionProcessor { /// Creates a new transaction processor. - pub fn new( - limiter: Arc, - geoip_lookup: GeoIpLookup, - quotas_client: Option, - ) -> Self { + pub fn new(limiter: Arc, geoip_lookup: GeoIpLookup) -> Self { Self { limiter, geoip_lookup, - quotas_client, } } } @@ -147,32 +140,30 @@ impl Processor for TransactionProcessor { relay_log::trace!("Filter transaction"); let filters_status = process::run_inbound_filters(&tx, ctx)?; - let quotas_client = self.quotas_client.as_ref(); - relay_log::trace!("Processing profile"); process::process_profile(&mut tx, ctx); relay_log::trace!("Sample transaction"); - let (tx, server_sample_rate) = - match process::run_dynamic_sampling(tx, ctx, filters_status, quotas_client).await? { - SamplingOutput::Keep { - payload, - sample_rate, - } => (payload, sample_rate), - SamplingOutput::Drop { - metrics, - mut profile, - } => { - // Remaining profile needs to be rate limited: - if let Some(p) = profile { - profile = self.limiter.enforce_quotas(p, ctx).await.ok(); - } - return Ok(Output { - main: profile.map(TransactionOutput::Profile), - metrics: Some(metrics), - }); + let (tx, server_sample_rate) = match process::run_dynamic_sampling(tx, ctx, filters_status)? + { + SamplingOutput::Keep { + payload, + sample_rate, + } => (payload, sample_rate), + SamplingOutput::Drop { + metrics, + mut profile, + } => { + // Remaining profile needs to be rate limited: + if let Some(p) = profile { + profile = self.limiter.enforce_quotas(p, ctx).await.ok(); } - }; + return Ok(Output { + main: profile.map(TransactionOutput::Profile), + metrics: Some(metrics), + }); + } + }; // Need to scrub the transaction before extracting spans. relay_log::trace!("Scrubbing transaction"); diff --git a/relay-server/src/processing/transactions/process.rs b/relay-server/src/processing/transactions/process.rs index 64ea65b2386..589b5b9aa69 100644 --- a/relay-server/src/processing/transactions/process.rs +++ b/relay-server/src/processing/transactions/process.rs @@ -1,13 +1,10 @@ -use std::sync::Arc; - use relay_base_schema::events::EventType; use relay_event_normalization::GeoIpLookup; use relay_event_schema::protocol::{Event, Metrics}; use relay_profiling::{ProfileError, ProfileType}; use relay_protocol::Annotated; use relay_quotas::DataCategory; -use relay_redis::AsyncRedisClient; -use relay_sampling::evaluation::{ReservoirEvaluator, SamplingDecision}; +use relay_sampling::evaluation::SamplingDecision; use relay_statsd::metric; use smallvec::smallvec; @@ -230,14 +227,12 @@ pub enum SamplingOutput { } /// Computes the sampling decision for a transaction and associated items. -pub async fn run_dynamic_sampling( +pub fn run_dynamic_sampling( payload: Managed>, ctx: Context<'_>, filters_status: FiltersStatus, - quotas_client: Option<&AsyncRedisClient>, ) -> Result> { - let sampling_result = - make_dynamic_sampling_decision(&payload, ctx, filters_status, quotas_client).await; + let sampling_result = make_dynamic_sampling_decision(&payload, ctx, filters_status); let sampling_match = match sampling_result { SamplingResult::Match(m) if m.decision().is_drop() => m, @@ -273,14 +268,12 @@ pub async fn run_dynamic_sampling( } /// Computes the dynamic sampling decision for the unit of work, but does not perform action on data. -async fn make_dynamic_sampling_decision( +fn make_dynamic_sampling_decision( work: &Managed>, ctx: Context<'_>, filters_status: FiltersStatus, - quotas_client: Option<&AsyncRedisClient>, ) -> SamplingResult { - let sampling_result = - do_make_dynamic_sampling_decision(work, ctx, filters_status, quotas_client).await; + let sampling_result = do_make_dynamic_sampling_decision(work, ctx, filters_status); relay_statsd::metric!( counter(RelayCounters::SamplingDecision) += 1, decision = sampling_result.decision().as_str(), @@ -289,11 +282,10 @@ async fn make_dynamic_sampling_decision( sampling_result } -async fn do_make_dynamic_sampling_decision( +fn do_make_dynamic_sampling_decision( work: &Managed>, ctx: Context<'_>, filters_status: FiltersStatus, - #[allow(unused)] quotas_client: Option<&AsyncRedisClient>, ) -> SamplingResult { // Always run dynamic sampling on processing Relays, // but delay decision until inbound filters have been fully processed. @@ -303,19 +295,7 @@ async fn do_make_dynamic_sampling_decision( return SamplingResult::Pending; } - #[allow(unused_mut)] - let mut reservoir = ReservoirEvaluator::new(Arc::clone(ctx.reservoir_counters)); - #[cfg(feature = "processing")] - if let Some(quotas_client) = quotas_client { - reservoir.set_redis(work.scoping().organization_id, quotas_client); - } - utils::dynamic_sampling::run( - work.headers.dsc(), - work.event.value(), - &ctx, - Some(&reservoir), - ) - .await + utils::dynamic_sampling::run(work.headers.dsc(), work.event.value(), &ctx) } type IndexedAndMetrics = ( diff --git a/relay-server/src/processing/utils/dynamic_sampling.rs b/relay-server/src/processing/utils/dynamic_sampling.rs index 599403e207b..73fb6cb27d5 100644 --- a/relay-server/src/processing/utils/dynamic_sampling.rs +++ b/relay-server/src/processing/utils/dynamic_sampling.rs @@ -5,18 +5,17 @@ use chrono::Utc; use relay_dynamic_config::ErrorBoundary; use relay_event_schema::protocol::Event; use relay_sampling::config::RuleType; -use relay_sampling::evaluation::{ReservoirEvaluator, SamplingEvaluator}; +use relay_sampling::evaluation::SamplingEvaluator; use relay_sampling::{DynamicSamplingContext, SamplingConfig}; use crate::processing::Context; use crate::utils::SamplingResult; /// Computes the sampling decision on an incoming event -pub async fn run( +pub fn run( dsc: Option<&DynamicSamplingContext>, event: Option<&Event>, ctx: &Context<'_>, - reservoir: Option<&ReservoirEvaluator<'_>>, ) -> SamplingResult { let sampling_config = match ctx.project_info.config.sampling { Some(ErrorBoundary::Ok(ref config)) if !config.unsupported() => Some(config), @@ -31,19 +30,16 @@ pub async fn run( compute_sampling_decision( ctx.config.processing_enabled(), - reservoir, sampling_config, event, root_config, dsc, ) - .await } /// Computes the sampling decision on the incoming envelope. -async fn compute_sampling_decision( +fn compute_sampling_decision( processing_enabled: bool, - reservoir: Option<&ReservoirEvaluator<'_>>, sampling_config: Option<&SamplingConfig>, event: Option<&Event>, root_sampling_config: Option<&SamplingConfig>, @@ -65,16 +61,13 @@ async fn compute_sampling_decision( } } - let mut evaluator = match reservoir { - Some(reservoir) => SamplingEvaluator::new_with_reservoir(Utc::now(), reservoir), - None => SamplingEvaluator::new(Utc::now()), - }; + let mut evaluator = SamplingEvaluator::new(Utc::now()); if let (Some(event), Some(sampling_state)) = (event, sampling_config) && let Some(seed) = event.id.value().map(|id| id.0) { let rules = sampling_state.filter_rules(RuleType::Transaction); - evaluator = match evaluator.match_rules(seed, event, rules).await { + evaluator = match evaluator.match_rules(seed, event, rules) { ControlFlow::Continue(evaluator) => evaluator, ControlFlow::Break(sampling_match) => { return SamplingResult::Match(sampling_match); @@ -84,7 +77,7 @@ async fn compute_sampling_decision( if let (Some(dsc), Some(sampling_state)) = (dsc, sampling_config) { let rules = sampling_state.filter_rules(RuleType::Project); - evaluator = match evaluator.match_rules(*dsc.trace_id, dsc, rules).await { + evaluator = match evaluator.match_rules(*dsc.trace_id, dsc, rules) { ControlFlow::Continue(evaluator) => evaluator, ControlFlow::Break(sampling_match) => { return SamplingResult::Match(sampling_match); @@ -94,10 +87,7 @@ async fn compute_sampling_decision( if let (Some(dsc), Some(sampling_state)) = (dsc, root_sampling_config) { let rules = sampling_state.filter_rules(RuleType::Trace); - return evaluator - .match_rules(*dsc.trace_id, dsc, rules) - .await - .into(); + return evaluator.match_rules(*dsc.trace_id, dsc, rules).into(); } SamplingResult::NoMatch @@ -177,15 +167,8 @@ mod tests { // TODO: This does not test if the sampling decision is actually applied. This should be // refactored to send a proper Envelope in and call process_state to cover the full // pipeline. - let res = compute_sampling_decision( - false, - None, - Some(&sampling_config), - Some(&event), - None, - None, - ) - .await; + let res = + compute_sampling_decision(false, Some(&sampling_config), Some(&event), None, None); assert_eq!(res.decision().is_keep(), should_keep); } } @@ -211,13 +194,11 @@ mod tests { let res = compute_sampling_decision( false, - None, Some(&sampling_config), Some(&event), None, Some(&mock_dsc()), - ) - .await; + ); assert!(res.is_match()); } } @@ -249,21 +230,12 @@ mod tests { }; // Unsupported rule should result in no match if processing is not enabled. - let res = compute_sampling_decision( - false, - None, - Some(&sampling_config), - Some(&event), - None, - None, - ) - .await; + let res = + compute_sampling_decision(false, Some(&sampling_config), Some(&event), None, None); assert!(res.is_no_match()); // Match if processing is enabled. - let res = - compute_sampling_decision(true, None, Some(&sampling_config), Some(&event), None, None) - .await; + let res = compute_sampling_decision(true, Some(&sampling_config), Some(&event), None, None); assert!(res.is_match()); } @@ -285,9 +257,7 @@ mod tests { ..SamplingConfig::new() }; - let res = - compute_sampling_decision(false, None, None, None, Some(&sampling_config), Some(&dsc)) - .await; + let res = compute_sampling_decision(false, None, None, Some(&sampling_config), Some(&dsc)); assert_eq!(get_sampling_match(res).sample_rate(), 0.2); } diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 5136e6ca68a..c79ce7a69c8 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -614,13 +614,11 @@ impl EnvelopeBufferService { return Ok(()); } - let reservoir_counters = own_project.reservoir_counters().clone(); services.envelope_processor.send(ProcessEnvelope { envelope: managed_envelope.into(), project_info: own_project_info.clone(), rate_limits: own_project.rate_limits().current_limits(), sampling_project_info: sampling_project_info.clone(), - reservoir_counters, }); Ok(()) diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 10cee84b0a6..bcfb479ead8 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -27,7 +27,7 @@ use relay_event_schema::protocol::{ClientReport, EventId, SpanV2}; use relay_filter::FilterStatKey; use relay_metrics::{Bucket, BucketMetadata, BucketView, BucketsView, MetricNamespace}; use relay_quotas::{DataCategory, RateLimits, Scoping}; -use relay_sampling::evaluation::{ReservoirCounters, SamplingDecision}; +use relay_sampling::evaluation::SamplingDecision; use relay_statsd::metric; use relay_system::{Addr, FromMessage, NoResponse, Service}; use reqwest::header; @@ -674,8 +674,6 @@ pub struct ProcessEnvelope { pub rate_limits: Arc, /// Root sampling project info. pub sampling_project_info: Option>, - /// Sampling reservoir counters. - pub reservoir_counters: ReservoirCounters, } /// Like a [`ProcessEnvelope`], but with an envelope which has been grouped. @@ -988,8 +986,8 @@ impl EnvelopeProcessorService { } #[cfg(feature = "processing")] - let rate_limiter = redis.as_ref().map(|redis| { - RedisRateLimiter::new(redis.quotas.clone()) + let rate_limiter = redis.map(|redis| { + RedisRateLimiter::new(redis.quotas) .max_limit(config.max_rate_limit()) .cache(config.quota_cache_ratio(), config.quota_cache_max()) }); @@ -1026,10 +1024,6 @@ impl EnvelopeProcessorService { transactions: TransactionProcessor::new( Arc::clone("a_limiter), geoip_lookup.clone(), - #[cfg(feature = "processing")] - redis.map(|r| r.quotas), - #[cfg(not(feature = "processing"))] - None, ), profile_chunks: ProfileChunksProcessor::new(Arc::clone("a_limiter)), trace_attachments: TraceAttachmentsProcessor::new(Arc::clone("a_limiter)), @@ -1360,7 +1354,6 @@ impl EnvelopeProcessorService { project_info: &message.project_info, sampling_project_info: message.sampling_project_info.as_deref(), rate_limits: &message.rate_limits, - reservoir_counters: &message.reservoir_counters, }; let message = ProcessEnvelopeGrouped { diff --git a/relay-server/src/services/projects/cache/project.rs b/relay-server/src/services/projects/cache/project.rs index 0dbfa710f79..a09d434ed80 100644 --- a/relay-server/src/services/projects/cache/project.rs +++ b/relay-server/src/services/projects/cache/project.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use relay_config::Config; use relay_quotas::{CachedRateLimits, DataCategory, MetricNamespaceScoping, RateLimits}; -use relay_sampling::evaluation::ReservoirCounters; use crate::Envelope; use crate::envelope::ItemType; @@ -33,11 +32,6 @@ impl<'a> Project<'a> { self.shared.cached_rate_limits() } - /// Returns a reference to the currently reservoir counters. - pub fn reservoir_counters(&self) -> &ReservoirCounters { - self.shared.reservoir_counters() - } - /// Checks the envelope against project configuration and rate limits. /// /// When `fetched`, then the project state is ensured to be up to date. When `cached`, an outdated diff --git a/relay-server/src/services/projects/cache/state.rs b/relay-server/src/services/projects/cache/state.rs index 029edab16be..d0fb1eac99a 100644 --- a/relay-server/src/services/projects/cache/state.rs +++ b/relay-server/src/services/projects/cache/state.rs @@ -9,7 +9,6 @@ use tokio::time::Instant; use arc_swap::ArcSwap; use relay_base_schema::project::ProjectKey; use relay_quotas::CachedRateLimits; -use relay_sampling::evaluation::ReservoirCounters; use relay_statsd::metric; use crate::services::projects::project::{ProjectState, Revision}; @@ -364,11 +363,6 @@ impl SharedProject { &self.0.rate_limits } - /// Returns a reference to the contained [`ReservoirCounters`]. - pub fn reservoir_counters(&self) -> &ReservoirCounters { - &self.0.reservoir_counters - } - /// Waits for the event of a changed project state, triggered by [`SharedProjectState::set_project_state`]. /// /// Note that the content of this instance does not change when the event is triggered. @@ -615,26 +609,9 @@ impl SharedProjectState { let prev = self.0.rcu(|stored| SharedProjectStateInner { state: state.clone(), rate_limits: Arc::clone(&stored.rate_limits), - reservoir_counters: Arc::clone(&stored.reservoir_counters), notify: Arc::clone(&stored.notify), }); - // Try clean expired reservoir counters. - // - // We do it after the `rcu`, to not re-run this more often than necessary. - if let Some(state) = state.enabled() { - let config = state.config.sampling.as_ref(); - if let Some(config) = config.and_then(|eb| eb.as_ref().ok()) { - // We can safely use previous here, the `rcu` just replaced the state, the - // reservoir counters did not change. - // - // `try_lock` to not potentially block, it's a best effort cleanup. - if let Ok(mut counters) = prev.reservoir_counters.try_lock() { - counters.retain(|key, _| config.rules.iter().any(|rule| rule.id == *key)); - } - } - } - // Finally, notify listeners: prev.notify.notify_waiters(); } @@ -658,7 +635,6 @@ impl SharedProjectState { struct SharedProjectStateInner { state: ProjectState, rate_limits: Arc, - reservoir_counters: ReservoirCounters, notify: Arc, } diff --git a/relay-server/src/utils/dynamic_sampling.rs b/relay-server/src/utils/dynamic_sampling.rs index ed83669c72b..334c74a01b0 100644 --- a/relay-server/src/utils/dynamic_sampling.rs +++ b/relay-server/src/utils/dynamic_sampling.rs @@ -59,7 +59,7 @@ impl SamplingResult { } } -impl From>> for SamplingResult { +impl From> for SamplingResult { fn from(value: ControlFlow) -> Self { match value { ControlFlow::Break(sampling_match) => Self::Match(sampling_match), @@ -134,7 +134,6 @@ mod tests { let result: SamplingResult = SamplingEvaluator::new(Utc::now()) .match_rules(Uuid::default(), &event, rules.iter()) - .await .into(); assert!(result.is_match()); @@ -148,7 +147,6 @@ mod tests { let result: SamplingResult = SamplingEvaluator::new(Utc::now()) .match_rules(Uuid::default(), &event, rules.iter()) - .await .into(); assert!(result.is_match()); @@ -171,7 +169,6 @@ mod tests { let result: SamplingResult = SamplingEvaluator::new(Utc::now()) .match_rules(Uuid::default(), &event, rules.iter()) - .await .into(); assert!(result.is_no_match()); @@ -186,7 +183,6 @@ mod tests { let result: SamplingResult = SamplingEvaluator::new(Utc::now()) .match_rules(Uuid::default(), &dsc, rules.iter()) - .await .into(); assert!(result.is_match());