Skip to content

Commit

Permalink
feat: matchable proxy table
Browse files Browse the repository at this point in the history
  • Loading branch information
Banyc committed Feb 20, 2024
1 parent 6ddd5b7 commit 319a85c
Show file tree
Hide file tree
Showing 16 changed files with 181 additions and 43 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ monitor_table = { git = "https://github.com/Banyc/monitor_table.git", rev = "f9c
mptcp = { git = "https://github.com/Banyc/mptcp.git", rev = "8c0b8ee35bc7d570de272f5da7af6742b460e23a" }
once_cell = "1"
openssl = "0.10"
regex = "1"
serde = "1"
strict-num = "0.2"
swap = { git = "https://github.com/Banyc/swap.git", rev = "d10a8b5b10503fa6ebac523cfcaa4d62135a665f" }
Expand Down
1 change: 1 addition & 0 deletions access_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ monitor_table = { workspace = true }
pin-project-lite = "0.2"
protocol = { path = "../protocol" }
proxy_client = { path = "../proxy_client" }
regex = { workspace = true }
serde = { workspace = true, features = ["derive"] }
table_log = { workspace = true }
thiserror = { workspace = true }
Expand Down
18 changes: 15 additions & 3 deletions access_server/src/socks5/server/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,8 +532,12 @@ impl Socks5ServerTcpAccess {
async fn establish_proxy_chain(
&self,
destination: InternetAddr,
) -> Result<(ConnAndAddr, Option<tokio_chacha20::config::Config>), StreamEstablishError> {
let proxy_chain = self.proxy_table.choose_chain();
) -> Result<(ConnAndAddr, Option<tokio_chacha20::config::Config>), EstablishProxyChainError>
{
let Some(proxy_table_group) = self.proxy_table.group(&destination) else {
return Err(EstablishProxyChainError::NoProxy);
};
let proxy_chain = proxy_table_group.choose_chain();
let res = proxy_client::stream::establish(
&proxy_chain.chain,
StreamAddr {
Expand All @@ -547,6 +551,14 @@ impl Socks5ServerTcpAccess {
}
}

#[derive(Debug, Error)]
pub enum EstablishProxyChainError {
#[error("No proxy")]
NoProxy,
#[error("{0}")]
StreamEstablish(#[from] StreamEstablishError),
}

pub enum EstablishResult<S> {
Blocked {
destination: InternetAddr,
Expand Down Expand Up @@ -604,7 +616,7 @@ pub enum EstablishError {
#[error("IO error: {0}")]
Io(#[from] io::Error),
#[error("Failed to establish proxy chain: {0}")]
EstablishProxyChain(#[from] StreamEstablishError),
EstablishProxyChain(#[from] EstablishProxyChainError),
#[error("Command BIND not supported")]
CmdBindNotSupported,
#[error("No UDP server available")]
Expand Down
7 changes: 6 additions & 1 deletion access_server/src/socks5/server/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,10 @@ impl Socks5ServerUdpAccess {
downstream_writer: UdpDownstreamWriter,
) -> Result<(), AccessProxyError> {
// Connect to upstream
let proxy_chain = self.proxy_table.choose_chain();
let Some(proxy_table_group) = self.proxy_table.group(&flow.flow().upstream.0) else {
return Err(AccessProxyError::NoProxy);
};
let proxy_chain = proxy_table_group.choose_chain();
let upstream =
UdpProxyClient::establish(proxy_chain.chain.clone(), flow.flow().upstream.0.clone())
.await?;
Expand Down Expand Up @@ -174,6 +177,8 @@ impl Socks5ServerUdpAccess {

#[derive(Debug, Error)]
pub enum AccessProxyError {
#[error("No proxy")]
NoProxy,
#[error("Failed to establish proxy chain: {0}")]
Establish(#[from] EstablishError),
}
Expand Down
41 changes: 35 additions & 6 deletions access_server/src/stream/proxy_table.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::{collections::HashMap, num::NonZeroUsize, sync::Arc};

use common::{
proxy_table::{ProxyTable, ProxyTableError},
filter::MatcherBuilder,
proxy_table::{ProxyTable, ProxyTableError, ProxyTableGroup},
stream::proxy_table::{
StreamProxyConfig, StreamProxyConfigBuildError, StreamProxyTable,
StreamProxyConfig, StreamProxyConfigBuildError, StreamProxyTable, StreamProxyTableGroup,
StreamWeightedProxyChainBuilder,
},
};
Expand All @@ -19,18 +20,43 @@ use tokio_util::sync::CancellationToken;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct StreamProxyTableBuilder {
pub groups: Vec<StreamProxyTableGroupBuilder>,
}
impl StreamProxyTableBuilder {
pub fn build(
self,
stream_proxy: &HashMap<Arc<str>, StreamProxyConfig<ConcreteStreamType>>,
stream_context: &ConcreteStreamContext,
cancellation: CancellationToken,
) -> Result<StreamProxyTable<ConcreteStreamType>, StreamProxyTableBuildError> {
let mut built = vec![];
for group in self.groups {
let g = group.build(stream_proxy, stream_context, cancellation.clone())?;
built.push(g);
}
Ok(ProxyTable::new(built))
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct StreamProxyTableGroupBuilder {
pub matcher: MatcherBuilder,
pub chains: Vec<StreamWeightedProxyChainBuilder<ConcreteStreamAddrStr>>,
pub trace_rtt: bool,
pub active_chains: Option<NonZeroUsize>,
}

impl StreamProxyTableBuilder {
impl StreamProxyTableGroupBuilder {
pub fn build(
self,
stream_proxy: &HashMap<Arc<str>, StreamProxyConfig<ConcreteStreamType>>,
stream_context: &ConcreteStreamContext,
cancellation: CancellationToken,
) -> Result<StreamProxyTable<ConcreteStreamType>, StreamProxyTableBuildError> {
) -> Result<StreamProxyTableGroup<ConcreteStreamType>, StreamProxyTableBuildError> {
let matcher = self
.matcher
.build()
.map_err(StreamProxyTableBuildError::Matcher)?;
let chains = self
.chains
.into_iter()
Expand All @@ -41,7 +67,8 @@ impl StreamProxyTableBuilder {
true => Some(StreamTracer::new(stream_context.clone())),
false => None,
};
Ok(ProxyTable::new(
Ok(ProxyTableGroup::new(
matcher,
chains,
tracer,
self.active_chains,
Expand All @@ -52,6 +79,8 @@ impl StreamProxyTableBuilder {

#[derive(Debug, Error)]
pub enum StreamProxyTableBuildError {
#[error("Matcher: {0}")]
Matcher(#[source] regex::Error),
#[error("Chain config is invalid: {0}")]
ChainConfig(#[source] StreamProxyConfigBuildError),
#[error("{0}")]
Expand Down
13 changes: 11 additions & 2 deletions access_server/src/stream/streams/http_tunnel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,11 @@ impl HttpAccess {
}

// Establish proxy chain
let proxy_chain = self.proxy_table.choose_chain();
let Some(proxy_table_group) = self.proxy_table.group(&addr.address) else {
trace!(?addr, "No proxy {}", method);
return Ok(respond_with_rejection());
};
let proxy_chain = proxy_table_group.choose_chain();
let upstream = establish(&proxy_chain.chain, addr.clone(), &self.stream_context).await?;

let session_guard = self.stream_context.session_table.as_ref().map(|s| {
Expand Down Expand Up @@ -500,7 +504,10 @@ impl HttpConnect {
address: address.clone(),
stream_type: ConcreteStreamType::Tcp,
};
let proxy_chain = self.proxy_table.choose_chain();
let Some(proxy_table_group) = self.proxy_table.group(&address) else {
return Err(HttpConnectError::NoProxy);
};
let proxy_chain = proxy_table_group.choose_chain();
let upstream = establish(
&proxy_chain.chain,
destination.clone(),
Expand Down Expand Up @@ -534,6 +541,8 @@ impl HttpConnect {

#[derive(Debug, Error)]
pub enum HttpConnectError {
#[error("No proxy")]
NoProxy,
#[error("Failed to establish proxy chain")]
EstablishProxyChain(#[from] StreamEstablishError),
}
Expand Down
7 changes: 6 additions & 1 deletion access_server/src/stream/streams/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,10 @@ impl TcpAccess {
where
S: IoStream + IoAddr,
{
let proxy_chain = self.proxy_table.choose_chain();
let Some(proxy_table_group) = self.proxy_table.group(&self.destination.address) else {
return Err(ProxyError::NoProxy);
};
let proxy_chain = proxy_table_group.choose_chain();
let upstream = establish(
&proxy_chain.chain,
self.destination.clone(),
Expand Down Expand Up @@ -168,6 +171,8 @@ impl TcpAccess {

#[derive(Debug, Error)]
pub enum ProxyError {
#[error("No proxy")]
NoProxy,
#[error("Failed to get downstream address: {0}")]
DownstreamAddr(#[source] io::Error),
#[error("Failed to establish proxy chain: {0}")]
Expand Down
7 changes: 6 additions & 1 deletion access_server/src/udp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,10 @@ impl UdpAccess {
downstream_writer: UdpDownstreamWriter,
) -> Result<(), AccessProxyError> {
// Connect to upstream
let proxy_chain = self.proxy_table.choose_chain();
let Some(proxy_table_group) = self.proxy_table.group(&flow.flow().upstream.0) else {
return Err(AccessProxyError::NoProxy);
};
let proxy_chain = proxy_table_group.choose_chain();
let upstream =
UdpProxyClient::establish(proxy_chain.chain.clone(), self.destination.clone()).await?;
let upstream_remote = upstream.remote_addr().clone();
Expand Down Expand Up @@ -174,6 +177,8 @@ impl UdpAccess {

#[derive(Debug, Error)]
pub enum AccessProxyError {
#[error("No proxy")]
NoProxy,
#[error("Failed to establish proxy chain: {0}")]
Establish(#[from] EstablishError),
#[error("Failed to copy: {0}")]
Expand Down
41 changes: 35 additions & 6 deletions access_server/src/udp/proxy_table.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::{collections::HashMap, num::NonZeroUsize, sync::Arc};

use common::{
proxy_table::{ProxyTable, ProxyTableError},
filter::MatcherBuilder,
proxy_table::{ProxyTable, ProxyTableError, ProxyTableGroup},
udp::proxy_table::{
UdpProxyConfig, UdpProxyConfigBuildError, UdpProxyTable, UdpWeightedProxyChainBuilder,
UdpProxyConfig, UdpProxyConfigBuildError, UdpProxyTable, UdpProxyTableGroup,
UdpWeightedProxyChainBuilder,
},
};
use proxy_client::udp::UdpTracer;
Expand All @@ -14,17 +16,41 @@ use tokio_util::sync::CancellationToken;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct UdpProxyTableBuilder {
pub groups: Vec<UdpProxyTableGroupBuilder>,
}
impl UdpProxyTableBuilder {
pub fn build(
self,
udp_proxy: &HashMap<Arc<str>, UdpProxyConfig>,
cancellation: CancellationToken,
) -> Result<UdpProxyTable, UdpProxyTableBuildError> {
let mut built = vec![];
for group in self.groups {
let g = group.build(udp_proxy, cancellation.clone())?;
built.push(g);
}
Ok(ProxyTable::new(built))
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct UdpProxyTableGroupBuilder {
pub matcher: MatcherBuilder,
pub chains: Vec<UdpWeightedProxyChainBuilder>,
pub trace_rtt: bool,
pub active_chains: Option<NonZeroUsize>,
}

impl UdpProxyTableBuilder {
impl UdpProxyTableGroupBuilder {
pub fn build(
self,
udp_proxy: &HashMap<Arc<str>, UdpProxyConfig>,
cancellation: CancellationToken,
) -> Result<UdpProxyTable, UdpProxyTableBuildError> {
) -> Result<UdpProxyTableGroup, UdpProxyTableBuildError> {
let matcher = self
.matcher
.build()
.map_err(UdpProxyTableBuildError::Matcher)?;
let chains = self
.chains
.into_iter()
Expand All @@ -35,7 +61,8 @@ impl UdpProxyTableBuilder {
true => Some(UdpTracer::new()),
false => None,
};
Ok(ProxyTable::new(
Ok(ProxyTableGroup::new(
matcher,
chains,
tracer,
self.active_chains,
Expand All @@ -46,6 +73,8 @@ impl UdpProxyTableBuilder {

#[derive(Debug, Error)]
pub enum UdpProxyTableBuildError {
#[error("Matcher: {0}")]
Matcher(#[source] regex::Error),
#[error("Chain config is invalid: {0}")]
ChainConfig(#[source] UdpProxyConfigBuildError),
#[error("{0}")]
Expand Down
2 changes: 1 addition & 1 deletion common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ monitor_table = { workspace = true }
once_cell = { workspace = true }
pin-project-lite = "0.2"
rand = "0.8"
regex = "1"
regex = { workspace = true }
scopeguard = "1"
serde = { workspace = true, features = ["derive", "rc"] }
slotmap = "1"
Expand Down
8 changes: 8 additions & 0 deletions common/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,14 @@ impl MatcherBuilderKind {

#[derive(Debug, Clone)]
pub struct Matcher(MatcherKind);
impl Matcher {
pub fn matches(&self, addr: &InternetAddr) -> bool {
match addr.deref() {
InternetAddrKind::SocketAddr(addr) => self.0.is_match_ip(*addr),
InternetAddrKind::DomainName { addr, port } => self.0.is_match_domain_name(addr, *port),
}
}
}

#[derive(Debug, Clone)]
enum MatcherKind {
Expand Down
Loading

0 comments on commit 319a85c

Please sign in to comment.