Skip to content

Commit

Permalink
feat: keyed proxy config
Browse files Browse the repository at this point in the history
  • Loading branch information
Banyc committed Feb 14, 2024
1 parent af7ddf5 commit 8604252
Show file tree
Hide file tree
Showing 12 changed files with 141 additions and 39 deletions.
31 changes: 26 additions & 5 deletions access_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ use common::{
error::AnyResult,
filter::{self, FilterBuilder, MatcherBuilder},
loading,
stream::proxy_table::StreamProxyConfig,
udp::proxy_table::UdpProxyConfig,
};
use protocol::context::ConcreteContext;
use protocol::{context::ConcreteContext, stream::addr::ConcreteStreamType};
use serde::{Deserialize, Serialize};
use socks5::server::{
tcp::{Socks5ServerTcpAccess, Socks5ServerTcpAccessServerConfig},
Expand Down Expand Up @@ -69,13 +71,15 @@ impl AccessServerConfig {
loader: &mut AccessServerLoader,
cancellation: CancellationToken,
context: ConcreteContext,
stream_proxy: &HashMap<Arc<str>, StreamProxyConfig<ConcreteStreamType>>,
udp_proxy: &HashMap<Arc<str>, UdpProxyConfig>,
) -> AnyResult {
// Shared
let stream_proxy_tables = self
.stream_proxy_tables
.into_iter()
.map(
|(k, v)| match v.build(&context.stream, cancellation.clone()) {
|(k, v)| match v.build(stream_proxy, &context.stream, cancellation.clone()) {
Ok(v) => Ok((k, v)),
Err(e) => Err(e),
},
Expand All @@ -84,7 +88,7 @@ impl AccessServerConfig {
let udp_proxy_tables = self
.udp_proxy_tables
.into_iter()
.map(|(k, v)| match v.build(cancellation.clone()) {
.map(|(k, v)| match v.build(udp_proxy, cancellation.clone()) {
Ok(v) => Ok((k, v)),
Err(e) => Err(e),
})
Expand All @@ -97,6 +101,7 @@ impl AccessServerConfig {
.into_iter()
.map(|c| {
c.into_builder(
stream_proxy,
&stream_proxy_tables,
cancellation.clone(),
context.stream.clone(),
Expand All @@ -112,7 +117,14 @@ impl AccessServerConfig {
let udp_server = self
.udp_server
.into_iter()
.map(|c| c.into_builder(&udp_proxy_tables, cancellation.clone(), context.udp.clone()))
.map(|c| {
c.into_builder(
udp_proxy,
&udp_proxy_tables,
cancellation.clone(),
context.udp.clone(),
)
})
.collect::<Result<Vec<_>, _>>()?;
loader
.udp_server
Expand All @@ -125,6 +137,7 @@ impl AccessServerConfig {
.into_iter()
.map(|c| {
c.into_builder(
stream_proxy,
&stream_proxy_tables,
&filters,
cancellation.clone(),
Expand All @@ -143,6 +156,7 @@ impl AccessServerConfig {
.into_iter()
.map(|c| {
c.into_builder(
stream_proxy,
&stream_proxy_tables,
&filters,
cancellation.clone(),
Expand All @@ -159,7 +173,14 @@ impl AccessServerConfig {
let socks5_udp_server = self
.socks5_udp_server
.into_iter()
.map(|c| c.into_builder(&udp_proxy_tables, cancellation.clone(), context.udp.clone()))
.map(|c| {
c.into_builder(
udp_proxy,
&udp_proxy_tables,
cancellation.clone(),
context.udp.clone(),
)
})
.collect::<Result<Vec<_>, _>>()?;
loader
.socks5_udp_server
Expand Down
5 changes: 3 additions & 2 deletions access_server/src/socks5/server/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use common::{
stream::{
addr::StreamAddr,
io_copy::{CopyBidirectional, MetricContext},
proxy_table::StreamProxyTable,
proxy_table::{StreamProxyConfig, StreamProxyTable},
IoAddr, IoStream, StreamServerHook,
},
};
Expand Down Expand Up @@ -50,6 +50,7 @@ pub struct Socks5ServerTcpAccessServerConfig {
impl Socks5ServerTcpAccessServerConfig {
pub fn into_builder(
self,
stream_proxy: &HashMap<Arc<str>, StreamProxyConfig<ConcreteStreamType>>,
proxy_tables: &HashMap<Arc<str>, StreamProxyTable<ConcreteStreamType>>,
filters: &HashMap<Arc<str>, Filter>,
cancellation: CancellationToken,
Expand All @@ -60,7 +61,7 @@ impl Socks5ServerTcpAccessServerConfig {
.get(&key)
.ok_or_else(|| BuildError::ProxyTableKeyNotFound(key.clone()))?
.clone(),
SharableConfig::Private(x) => x.build(&stream_context, cancellation)?,
SharableConfig::Private(x) => x.build(stream_proxy, &stream_context, cancellation)?,
};
let filter = match self.filter {
SharableConfig::SharingKey(key) => filters
Expand Down
5 changes: 3 additions & 2 deletions access_server/src/socks5/server/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use common::{
udp::{
context::UdpContext,
io_copy::{CopyBidirectional, DownstreamParts, UpstreamParts},
proxy_table::UdpProxyTable,
proxy_table::{UdpProxyConfig, UdpProxyTable},
FlowOwnedGuard, Packet, UdpDownstreamWriter, UdpServer, UdpServerHook, UpstreamAddr,
},
};
Expand All @@ -34,6 +34,7 @@ pub struct Socks5ServerUdpAccessServerConfig {
impl Socks5ServerUdpAccessServerConfig {
pub fn into_builder(
self,
udp_proxy: &HashMap<Arc<str>, UdpProxyConfig>,
proxy_tables: &HashMap<Arc<str>, UdpProxyTable>,
cancellation: CancellationToken,
udp_context: UdpContext,
Expand All @@ -43,7 +44,7 @@ impl Socks5ServerUdpAccessServerConfig {
.get(&key)
.ok_or_else(|| BuildError::ProxyTableKeyNotFound(key.clone()))?
.clone(),
SharableConfig::Private(x) => x.build(cancellation)?,
SharableConfig::Private(x) => x.build(udp_proxy, cancellation)?,
};

Ok(Socks5ServerUdpAccessServerBuilder {
Expand Down
8 changes: 5 additions & 3 deletions access_server/src/stream/proxy_table.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::num::NonZeroUsize;
use std::{collections::HashMap, num::NonZeroUsize, sync::Arc};

use common::{
proxy_table::{ProxyTable, ProxyTableError},
stream::proxy_table::{
StreamProxyConfigBuildError, StreamProxyTable, StreamWeightedProxyChainBuilder,
StreamProxyConfig, StreamProxyConfigBuildError, StreamProxyTable,
StreamWeightedProxyChainBuilder,
},
};
use protocol::stream::{
Expand All @@ -26,13 +27,14 @@ pub struct StreamProxyTableBuilder {
impl StreamProxyTableBuilder {
pub fn build(
self,
stream_proxy: &HashMap<Arc<str>, StreamProxyConfig<ConcreteStreamType>>,
stream_context: &ConcreteStreamContext,
cancellation: CancellationToken,
) -> Result<StreamProxyTable<ConcreteStreamType>, StreamProxyTableBuildError> {
let chains = self
.chains
.into_iter()
.map(|c| c.build())
.map(|c| c.build(stream_proxy))
.collect::<Result<_, _>>()
.map_err(StreamProxyTableBuildError::ChainConfig)?;
let tracer = match self.trace_rtt {
Expand Down
5 changes: 3 additions & 2 deletions access_server/src/stream/streams/http_tunnel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use common::{
addr::StreamAddr,
io_copy::{CopyBidirectional, MetricContext, DEAD_SESSION_RETENTION_DURATION},
metrics::{SimplifiedStreamMetrics, SimplifiedStreamProxyMetrics, StreamRecord},
proxy_table::StreamProxyTable,
proxy_table::{StreamProxyConfig, StreamProxyTable},
session_table::{Session, StreamSessionTable},
IoAddr, IoStream, StreamServerHook,
},
Expand Down Expand Up @@ -49,6 +49,7 @@ pub struct HttpAccessServerConfig {
impl HttpAccessServerConfig {
pub fn into_builder(
self,
stream_proxy: &HashMap<Arc<str>, StreamProxyConfig<ConcreteStreamType>>,
proxy_tables: &HashMap<Arc<str>, StreamProxyTable<ConcreteStreamType>>,
filters: &HashMap<Arc<str>, Filter>,
cancellation: CancellationToken,
Expand All @@ -59,7 +60,7 @@ impl HttpAccessServerConfig {
.get(&key)
.ok_or_else(|| BuildError::ProxyTableKeyNotFound(key.clone()))?
.clone(),
SharableConfig::Private(x) => x.build(&stream_context, cancellation)?,
SharableConfig::Private(x) => x.build(stream_proxy, &stream_context, cancellation)?,
};
let filter = match self.filter {
SharableConfig::SharingKey(key) => filters
Expand Down
5 changes: 3 additions & 2 deletions access_server/src/stream/streams/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use common::{
loading,
stream::{
io_copy::{CopyBidirectional, MetricContext},
proxy_table::StreamProxyTable,
proxy_table::{StreamProxyConfig, StreamProxyTable},
IoAddr, IoStream, StreamServerHook,
},
};
Expand Down Expand Up @@ -36,6 +36,7 @@ pub struct TcpAccessServerConfig {
impl TcpAccessServerConfig {
pub fn into_builder(
self,
stream_proxy: &HashMap<Arc<str>, StreamProxyConfig<ConcreteStreamType>>,
proxy_tables: &HashMap<Arc<str>, StreamProxyTable<ConcreteStreamType>>,
cancellation: CancellationToken,
stream_context: ConcreteStreamContext,
Expand All @@ -45,7 +46,7 @@ impl TcpAccessServerConfig {
.get(&key)
.ok_or_else(|| BuildError::ProxyTableKeyNotFound(key.clone()))?
.clone(),
SharableConfig::Private(x) => x.build(&stream_context, cancellation)?,
SharableConfig::Private(x) => x.build(stream_proxy, &stream_context, cancellation)?,
};

Ok(TcpAccessServerBuilder {
Expand Down
8 changes: 4 additions & 4 deletions access_server/src/udp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ use std::{collections::HashMap, io, sync::Arc};

use async_speed_limit::Limiter;
use common::{
addr::InternetAddr,
addr::InternetAddrStr,
addr::{InternetAddr, InternetAddrStr},
config::SharableConfig,
loading,
udp::{
context::UdpContext,
io_copy::{CopyBiError, CopyBidirectional, DownstreamParts, UpstreamParts},
proxy_table::UdpProxyTable,
proxy_table::{UdpProxyConfig, UdpProxyTable},
FlowOwnedGuard, Packet, UdpDownstreamWriter, UdpServer, UdpServerHook, UpstreamAddr,
},
};
Expand All @@ -36,6 +35,7 @@ pub struct UdpAccessServerConfig {
impl UdpAccessServerConfig {
pub fn into_builder(
self,
udp_proxy: &HashMap<Arc<str>, UdpProxyConfig>,
proxy_tables: &HashMap<Arc<str>, UdpProxyTable>,
cancellation: CancellationToken,
udp_context: UdpContext,
Expand All @@ -45,7 +45,7 @@ impl UdpAccessServerConfig {
.get(&key)
.ok_or_else(|| BuildError::ProxyTableKeyNotFound(key.clone()))?
.clone(),
SharableConfig::Private(x) => x.build(cancellation.clone())?,
SharableConfig::Private(x) => x.build(udp_proxy, cancellation.clone())?,
};

Ok(UdpAccessServerBuilder {
Expand Down
9 changes: 6 additions & 3 deletions access_server/src/udp/proxy_table.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::num::NonZeroUsize;
use std::{collections::HashMap, num::NonZeroUsize, sync::Arc};

use common::{
proxy_table::{ProxyTable, ProxyTableError},
udp::proxy_table::{UdpProxyConfigBuildError, UdpProxyTable, UdpWeightedProxyChainBuilder},
udp::proxy_table::{
UdpProxyConfig, UdpProxyConfigBuildError, UdpProxyTable, UdpWeightedProxyChainBuilder,
},
};
use proxy_client::udp::UdpTracer;
use serde::{Deserialize, Serialize};
Expand All @@ -20,12 +22,13 @@ pub struct UdpProxyTableBuilder {
impl UdpProxyTableBuilder {
pub fn build(
self,
udp_proxy: &HashMap<Arc<str>, UdpProxyConfig>,
cancellation: CancellationToken,
) -> Result<UdpProxyTable, UdpProxyTableBuildError> {
let chains = self
.chains
.into_iter()
.map(|c| c.build())
.map(|c| c.build(udp_proxy))
.collect::<Result<_, _>>()
.map_err(UdpProxyTableBuildError::ChainConfig)?;
let tracer = match self.trace_rtt {
Expand Down
30 changes: 26 additions & 4 deletions common/src/stream/pool.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use std::{io, marker::PhantomData, net::SocketAddr, sync::Arc, time::Duration};
use std::{
collections::HashMap, io, marker::PhantomData, net::SocketAddr, sync::Arc, time::Duration,
};

use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio_conn_pool::{ConnPool, ConnPoolEntry};

use crate::{
config::SharableConfig,
header::heartbeat::send_noop,
proxy_table::ProxyConfig,
stream::{
Expand All @@ -18,6 +21,7 @@ use super::{
addr::{StreamAddr, StreamAddrStr, StreamType},
connect::StreamConnectorTable,
context::StreamContext,
proxy_table::StreamProxyConfig,
IoStream,
};

Expand All @@ -26,7 +30,9 @@ const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30);
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
#[serde(bound(deserialize = "SAS: Deserialize<'de>"))]
pub struct PoolBuilder<SAS>(#[serde(default)] pub Vec<StreamProxyConfigBuilder<SAS>>);
pub struct PoolBuilder<SAS>(
#[serde(default)] pub Vec<SharableConfig<StreamProxyConfigBuilder<SAS>>>,
);
impl<SAS> PoolBuilder<SAS> {
pub fn new() -> Self {
Self(vec![])
Expand All @@ -40,15 +46,24 @@ where
pub fn build<C, CT>(
self,
connector_table: CT,
) -> Result<ConnPool<StreamAddr<ST>, C>, StreamProxyConfigBuildError>
stream_proxy: &HashMap<Arc<str>, StreamProxyConfig<ST>>,
) -> Result<ConnPool<StreamAddr<ST>, C>, PoolBuildError>
where
C: std::fmt::Debug + IoStream,
CT: StreamConnectorTable<Connection = C, StreamType = ST>,
{
let c = self
.0
.into_iter()
.map(|c| c.build::<ST>())
.map(|c| match c {
SharableConfig::SharingKey(k) => stream_proxy
.get(&k)
.cloned()
.ok_or_else(|| PoolBuildError::KeyNotFound(k)),
SharableConfig::Private(c) => c
.build::<ST>()
.map_err(PoolBuildError::StreamProxyConfigBuild),
})
.collect::<Result<Vec<_>, _>>()?;
let entries = pool_entries_from_proxy_configs(c.into_iter(), connector_table.clone());
let pool = ConnPool::new(entries);
Expand All @@ -60,6 +75,13 @@ impl<SAS> Default for PoolBuilder<SAS> {
Self::new()
}
}
#[derive(Debug, Error)]
pub enum PoolBuildError {
#[error("{0}")]
StreamProxyConfigBuild(#[from] StreamProxyConfigBuildError),
#[error("Key not found: {0}")]
KeyNotFound(Arc<str>),
}

fn pool_entries_from_proxy_configs<C, CT: Clone, ST>(
proxy_configs: impl Iterator<Item = ProxyConfig<StreamAddr<ST>>>,
Expand Down
Loading

0 comments on commit 8604252

Please sign in to comment.