Skip to content

Commit

Permalink
refactor: merge filter with proxy table; split proxy group from proxy…
Browse files Browse the repository at this point in the history
… table
  • Loading branch information
Banyc committed Feb 21, 2024
1 parent 319a85c commit 78d7779
Show file tree
Hide file tree
Showing 33 changed files with 1,583 additions and 1,203 deletions.
195 changes: 144 additions & 51 deletions access_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,85 @@ use std::{collections::HashMap, sync::Arc};
use common::{
config::{merge_map, Merge},
error::{AnyError, AnyResult},
filter::{self, FilterBuilder, MatcherBuilder},
filter::{self, MatcherBuilder},
loading,
stream::proxy_table::StreamProxyConfig,
udp::proxy_table::UdpProxyConfig,
udp::proxy_table::{UdpProxyConfig, UdpProxyGroupBuilder, UdpProxyTableBuilder},
};
use protocol::{context::ConcreteContext, stream::addr::ConcreteStreamType};
use protocol::{
context::ConcreteContext,
stream::proxy_table::{StreamProxyConfig, StreamProxyGroupBuilder, StreamProxyTableBuilder},
};
use proxy_client::{stream::StreamTracerBuilder, udp::UdpTracerBuilder};
use serde::{Deserialize, Serialize};
use socks5::server::{
tcp::{Socks5ServerTcpAccess, Socks5ServerTcpAccessServerConfig},
udp::{Socks5ServerUdpAccess, Socks5ServerUdpAccessServerConfig},
};
use stream::{
proxy_table::StreamProxyTableBuilder,
proxy_table::{StreamProxyGroupBuildContext, StreamProxyTableBuildContext},
streams::{
http_tunnel::{HttpAccess, HttpAccessServerConfig},
tcp::{TcpAccess, TcpAccessServerConfig},
},
};
use tokio_util::sync::CancellationToken;
use udp::{proxy_table::UdpProxyTableBuilder, UdpAccess, UdpAccessServerConfig};
use udp::{
proxy_table::{UdpProxyGroupBuildContext, UdpProxyTableBuildContext},
UdpAccess, UdpAccessServerConfig,
};

pub mod socks5;
pub mod stream;
pub mod udp;

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(deny_unknown_fields)]
pub struct AccessServerStream {
#[serde(default)]
pub proxy_table: HashMap<Arc<str>, StreamProxyTableBuilder>,
#[serde(default)]
pub proxy_group: HashMap<Arc<str>, StreamProxyGroupBuilder>,
}
impl Merge for AccessServerStream {
type Error = AnyError;

fn merge(self, other: Self) -> Result<Self, Self::Error>
where
Self: Sized,
{
let proxy_table = merge_map(self.proxy_table, other.proxy_table)?;
let proxy_group = merge_map(self.proxy_group, other.proxy_group)?;
Ok(Self {
proxy_table,
proxy_group,
})
}
}

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(deny_unknown_fields)]
pub struct AccessServerUdp {
#[serde(default)]
pub proxy_table: HashMap<Arc<str>, UdpProxyTableBuilder>,
#[serde(default)]
pub proxy_group: HashMap<Arc<str>, UdpProxyGroupBuilder>,
}
impl Merge for AccessServerUdp {
type Error = AnyError;

fn merge(self, other: Self) -> Result<Self, Self::Error>
where
Self: Sized,
{
let proxy_table = merge_map(self.proxy_table, other.proxy_table)?;
let proxy_group = merge_map(self.proxy_group, other.proxy_group)?;
Ok(Self {
proxy_table,
proxy_group,
})
}
}

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(deny_unknown_fields)]
pub struct AccessServerConfig {
Expand All @@ -42,13 +96,11 @@ pub struct AccessServerConfig {
#[serde(default)]
pub socks5_udp_server: Vec<Socks5ServerUdpAccessServerConfig>,
#[serde(default)]
pub stream_proxy_tables: HashMap<Arc<str>, StreamProxyTableBuilder>,
#[serde(default)]
pub udp_proxy_tables: HashMap<Arc<str>, UdpProxyTableBuilder>,
stream: AccessServerStream,
#[serde(default)]
pub matchers: HashMap<Arc<str>, MatcherBuilder>,
udp: AccessServerUdp,
#[serde(default)]
pub filters: HashMap<Arc<str>, FilterBuilder>,
pub matcher: HashMap<Arc<str>, MatcherBuilder>,
}
impl AccessServerConfig {
pub fn new() -> AccessServerConfig {
Expand All @@ -58,10 +110,9 @@ impl AccessServerConfig {
http_server: Default::default(),
socks5_tcp_server: Default::default(),
socks5_udp_server: Default::default(),
stream_proxy_tables: Default::default(),
udp_proxy_tables: Default::default(),
matchers: Default::default(),
filters: Default::default(),
stream: Default::default(),
udp: Default::default(),
matcher: Default::default(),
}
}

Expand All @@ -71,39 +122,89 @@ impl AccessServerConfig {
loader: &mut AccessServerLoader,
cancellation: CancellationToken,
context: ConcreteContext,
stream_proxy: &HashMap<Arc<str>, StreamProxyConfig<ConcreteStreamType>>,
udp_proxy: &HashMap<Arc<str>, UdpProxyConfig>,
stream_proxy_server: &HashMap<Arc<str>, StreamProxyConfig>,
udp_proxy_server: &HashMap<Arc<str>, UdpProxyConfig>,
) -> AnyResult {
// Shared
let matcher: HashMap<Arc<str>, filter::Matcher> = self
.matcher
.into_iter()
.map(|(k, v)| match v.build() {
Ok(v) => Ok((k, v)),
Err(e) => Err(e),
})
.collect::<Result<HashMap<_, _>, _>>()?;

// Stream
let stream_trace_builder = StreamTracerBuilder::new(context.stream.clone());
let stream_proxy_group_cx = StreamProxyGroupBuildContext {
proxy_server: stream_proxy_server,
tracer_builder: &stream_trace_builder,
cancellation: cancellation.clone(),
};
let stream_proxy_group = self
.stream
.proxy_group
.into_iter()
.map(|(k, v)| match v.build(stream_proxy_group_cx.clone()) {
Ok(v) => Ok((k, v)),
Err(e) => Err(e),
})
.collect::<Result<HashMap<_, _>, _>>()?;
let stream_proxy_table_cx = StreamProxyTableBuildContext {
matcher: &matcher,
proxy_group: &stream_proxy_group,
proxy_group_cx: stream_proxy_group_cx.clone(),
};
let stream_proxy_tables = self
.stream_proxy_tables
.stream
.proxy_table
.into_iter()
.map(
|(k, v)| match v.build(stream_proxy, &context.stream, cancellation.clone()) {
Ok(v) => Ok((k, v)),
Err(e) => Err(e),
},
)
.map(|(k, v)| match v.build(stream_proxy_table_cx.clone()) {
Ok(v) => Ok((k, v)),
Err(e) => Err(e),
})
.collect::<Result<HashMap<_, _>, _>>()?;
let udp_proxy_tables = self
.udp_proxy_tables

// UDP
let udp_trace_builder = UdpTracerBuilder::new();
let udp_proxy_group_cx = UdpProxyGroupBuildContext {
proxy_server: udp_proxy_server,
tracer_builder: &udp_trace_builder,
cancellation: cancellation.clone(),
};
let udp_proxy_group = self
.udp
.proxy_group
.into_iter()
.map(|(k, v)| match v.build(udp_proxy, cancellation.clone()) {
.map(|(k, v)| match v.build(udp_proxy_group_cx.clone()) {
Ok(v) => Ok((k, v)),
Err(e) => Err(e),
})
.collect::<Result<HashMap<_, _>, _>>()?;
let udp_proxy_table_cx = UdpProxyTableBuildContext {
matcher: &matcher,
proxy_group: &udp_proxy_group,
proxy_group_cx: udp_proxy_group_cx.clone(),
};
let _udp_proxy_tables = self
.udp
.proxy_table
.into_iter()
.map(|(k, v)| match v.build(udp_proxy_table_cx.clone()) {
Ok(v) => Ok((k, v)),
Err(e) => Err(e),
})
.collect::<Result<HashMap<_, _>, _>>()?;
let filters = filter::build_from_map(self.matchers, self.filters)?;

// TCP servers
let tcp_server = self
.tcp_server
.into_iter()
.map(|c| {
c.into_builder(
stream_proxy,
&stream_proxy_tables,
cancellation.clone(),
&stream_proxy_group,
stream_proxy_group_cx.clone(),
context.stream.clone(),
)
})
Expand All @@ -119,9 +220,8 @@ impl AccessServerConfig {
.into_iter()
.map(|c| {
c.into_builder(
udp_proxy,
&udp_proxy_tables,
cancellation.clone(),
&udp_proxy_group,
udp_proxy_group_cx.clone(),
context.udp.clone(),
)
})
Expand All @@ -137,10 +237,8 @@ impl AccessServerConfig {
.into_iter()
.map(|c| {
c.into_builder(
stream_proxy,
&stream_proxy_tables,
&filters,
cancellation.clone(),
stream_proxy_table_cx.clone(),
context.stream.clone(),
)
})
Expand All @@ -156,10 +254,8 @@ impl AccessServerConfig {
.into_iter()
.map(|c| {
c.into_builder(
stream_proxy,
&stream_proxy_tables,
&filters,
cancellation.clone(),
stream_proxy_table_cx.clone(),
context.stream.clone(),
)
})
Expand All @@ -175,9 +271,8 @@ impl AccessServerConfig {
.into_iter()
.map(|c| {
c.into_builder(
udp_proxy,
&udp_proxy_tables,
cancellation.clone(),
&udp_proxy_group,
udp_proxy_group_cx.clone(),
context.udp.clone(),
)
})
Expand All @@ -202,20 +297,18 @@ impl Merge for AccessServerConfig {
self.http_server.extend(other.http_server);
self.socks5_tcp_server.extend(other.socks5_tcp_server);
self.socks5_udp_server.extend(other.socks5_udp_server);
let stream_proxy_tables = merge_map(self.stream_proxy_tables, other.stream_proxy_tables)?;
let udp_proxy_tables = merge_map(self.udp_proxy_tables, other.udp_proxy_tables)?;
let matchers = merge_map(self.matchers, other.matchers)?;
let filters = merge_map(self.filters, other.filters)?;
let stream = self.stream.merge(other.stream)?;
let udp = self.udp.merge(other.udp)?;
let matcher = merge_map(self.matcher, other.matcher)?;
Ok(Self {
tcp_server: self.tcp_server,
udp_server: self.udp_server,
http_server: self.http_server,
socks5_tcp_server: self.socks5_tcp_server,
socks5_udp_server: self.socks5_udp_server,
stream_proxy_tables,
udp_proxy_tables,
matchers,
filters,
stream,
udp,
matcher,
})
}
}
Expand Down
Loading

0 comments on commit 78d7779

Please sign in to comment.