Skip to content

Commit

Permalink
perf: reduce multithreading overhead
Browse files Browse the repository at this point in the history
  • Loading branch information
Banyc committed Jan 14, 2024
1 parent 8ee5d8e commit 7e70e90
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 21 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ once_cell = "1"
pin-project-lite = "0.2"
# quinn = "0.10.1"
rand = "0.8"
rayon = "1"
regex = "1"
scopeguard = "1"
serde = { workspace = true, features = ["derive", "rc"] }
Expand Down
13 changes: 5 additions & 8 deletions common/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::{
sync::Arc,
};

use rayon::prelude::*;
use regex::Regex;
use serde::{Deserialize, Serialize};
use thiserror::Error;
Expand Down Expand Up @@ -230,7 +229,7 @@ impl MatcherKind {
addr_matcher.is_match_domain_name(addr)
}
MatcherKind::Many(matchers) => matchers
.par_iter()
.iter()
.any(|matcher| matcher.is_match_domain_name(addr, port)),
}
}
Expand All @@ -246,9 +245,7 @@ impl MatcherKind {
}
addr_matcher.is_match_ip(addr.ip())
}
MatcherKind::Many(matchers) => {
matchers.par_iter().any(|matcher| matcher.is_match_ip(addr))
}
MatcherKind::Many(matchers) => matchers.iter().any(|matcher| matcher.is_match_ip(addr)),
}
}
}
Expand Down Expand Up @@ -293,15 +290,15 @@ impl AddrListMatcher {
pub fn is_match_domain_name(&self, addr: &str) -> bool {
match self {
Self::Some(matchers) => matchers
.par_iter()
.iter()
.any(|matcher| matcher.is_match_domain_name(addr)),
Self::Any => true,
}
}

pub fn is_match_ip(&self, addr: IpAddr) -> bool {
match self {
Self::Some(matchers) => matchers.par_iter().any(|matcher| matcher.is_match_ip(addr)),
Self::Some(matchers) => matchers.iter().any(|matcher| matcher.is_match_ip(addr)),
Self::Any => true,
}
}
Expand Down Expand Up @@ -403,7 +400,7 @@ enum PortListMatcher {
impl PortListMatcher {
pub fn is_match(&self, port: u16) -> bool {
match self {
Self::Some(matcher) => matcher.par_iter().any(|range| range.is_match(port)),
Self::Some(matcher) => matcher.iter().any(|range| range.is_match(port)),
Self::Any => true,
}
}
Expand Down
20 changes: 9 additions & 11 deletions common/src/proxy_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::{
};

use rand::Rng;
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -40,7 +39,6 @@ where
{
assert!(!nodes.is_empty());
let mut pairs = (0..nodes.len() - 1)
.into_par_iter()
.map(|i| {
let node = &nodes[i];
let next_node = &nodes[i + 1];
Expand Down Expand Up @@ -78,7 +76,7 @@ where
where
T: Tracer<Address = A> + Send + Sync + 'static,
{
let cum_weight = chains.par_iter().map(|c| c.weight).sum();
let cum_weight = chains.iter().map(|c| c.weight).sum();
if cum_weight == 0 {
return Err(ProxyTableError::ZeroAccumulatedWeight);
}
Expand Down Expand Up @@ -119,7 +117,7 @@ where
None => {
let scores: Arc<[_]> = self.scores().into();
info!(?scores, "Calculated scores");
let sum = scores.par_iter().map(|(_, s)| *s).sum::<f64>();
let sum = scores.iter().map(|(_, s)| *s).sum::<f64>();
let scores = Scores { scores, sum };
self.score_store.write().unwrap().set(scores.clone());
scores
Expand All @@ -144,18 +142,18 @@ where
fn scores(&self) -> Vec<(usize, f64)> {
let weights_hat = self
.chains
.par_iter()
.iter()
.map(|c| c.weighted().weight as f64 / self.cum_weight.get() as f64)
.collect::<Vec<_>>();

let rtt = self
.chains
.par_iter()
.iter()
.map(|c| c.rtt().map(|r| r.as_secs_f64()))
.collect::<Vec<_>>();
let rtt_hat = normalize(&rtt);

let losses = self.chains.par_iter().map(|c| c.loss()).collect::<Vec<_>>();
let losses = self.chains.iter().map(|c| c.loss()).collect::<Vec<_>>();
let losses_hat = normalize(&losses);

let mut scores = (0..self.chains.len())
Expand All @@ -177,21 +175,21 @@ pub enum ProxyTableError {
}

fn normalize(list: &[Option<f64>]) -> Vec<f64> {
let sum_some: f64 = list.par_iter().map(|x| x.unwrap_or(0.)).sum();
let count_some = list.par_iter().map(|x| x.map(|_| 1).unwrap_or(0)).sum();
let sum_some: f64 = list.iter().map(|x| x.unwrap_or(0.)).sum();
let count_some = list.iter().map(|x| x.map(|_| 1).unwrap_or(0)).sum();
let hat = match count_some {
0 => {
let hat_mean = 1. / list.len() as f64;
(0..list.len()).map(|_| hat_mean).collect::<Vec<_>>()
}
_ => {
let mean = sum_some / count_some as f64;
let sum: f64 = list.par_iter().map(|x| x.unwrap_or(mean)).sum();
let sum: f64 = list.iter().map(|x| x.unwrap_or(mean)).sum();
if sum == 0. {
(0..list.len()).map(|_| 0.).collect::<Vec<_>>()
} else {
let hat_mean = mean / sum;
list.par_iter()
list.iter()
.map(|x| x.map(|x| x / sum).unwrap_or(hat_mean))
.collect::<Vec<_>>()
}
Expand Down

0 comments on commit 7e70e90

Please sign in to comment.