Skip to content

Commit eb1d00c

Browse files
committed
perf(tunnel): dynamic coalescing, O(n) upload buffer, network_preset config
Steps 3 + 4 of the perf/relay-speed series. --- Step 3: Dynamic coalescing (tunnel_client.rs + config.rs + proxy_server.rs) --- Replace static DEFAULT_COALESCE_STEP/MAX constants with Arc<AtomicU64> values stored in TunnelMux. A new RttTracker (ring buffer of last 8 batch RTTs) measures median round-trip time and auto-adjusts the coalesce window: Slow preset (median > 1200ms): step=150ms max=600ms Fast preset (median < 700ms): step= 50ms max=300ms Hysteresis: 3 consecutive sub-threshold readings required to leave Slow, preventing flapping on bursty congestion. mux_loop reads the atomics at each new window boundary so preset changes take effect without restart. Config: network_preset = "auto" (default) | "fast" | "slow" - auto starts at Fast and adapts via RttTracker - fast/slow lock the preset and skip RTT measurement Explicit coalesce_step_ms / coalesce_max_ms override still honoured when set; presence of either disables auto-adaptation. Note: TunnelMux is only started in mode=full. Mode=apps_script relays each connection directly through DomainFronter and does not go through this path. New unit tests: rtt_tracker_preset_selection_slow, rtt_tracker_preset_selection_fast, rtt_tracker_hysteresis_prevents_premature_flip. Fixed copy_bidirectional test in proxy_server.rs: a_client was moved into write_task then borrowed for reading — split into separate read/write halves before spawning so both directions can be independently asserted. --- Step 4: O(n) amortised buffered upload merge (tunnel_client.rs) --- Change buffered_upload from Option<Bytes> to Option<BytesMut>. The old merge path allocated a fresh buffer and copied all accumulated data on every new upload chunk under pipeline congestion — O(n^2) total copies for n chunks. BytesMut extends in-place (amortised O(n)); freeze() at send time is a zero-copy Arc pointer bump.
1 parent 1615576 commit eb1d00c

3 files changed

Lines changed: 362 additions & 59 deletions

File tree

src/config.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,13 @@ pub struct Config {
112112
/// Hard cap on total coalesce wait (ms). 0 = use compiled default (1000ms).
113113
#[serde(default)]
114114
pub coalesce_max_ms: u16,
115+
/// Adaptive coalescing preset. One of "auto" (default), "fast", or "slow".
116+
/// "auto" measures batch RTT and switches automatically.
117+
/// "fast" uses 50ms/300ms windows — best for broadband/fiber.
118+
/// "slow" uses 150ms/600ms windows — best for slow links (Iran cable, mobile).
119+
/// Leave unset or set to "auto" for automatic detection.
120+
#[serde(default)]
121+
pub network_preset: Option<String>,
115122
/// Optional explicit SNI rotation pool for outbound TLS to `google_ip`.
116123
/// Empty / missing = auto-expand from `front_domain` (current default of
117124
/// {www, mail, drive, docs, calendar}.google.com). Set to an explicit list
@@ -785,6 +792,8 @@ pub struct TomlRelay {
785792
#[serde(default)]
786793
pub coalesce_max_ms: u16,
787794
#[serde(default)]
795+
pub network_preset: Option<String>,
796+
#[serde(default)]
788797
pub youtube_via_relay: bool,
789798
#[serde(default)]
790799
pub normalize_x_graphql: bool,
@@ -935,6 +944,7 @@ impl From<TomlConfig> for Config {
935944
parallel_relay: t.relay.parallel_relay,
936945
coalesce_step_ms: t.relay.coalesce_step_ms,
937946
coalesce_max_ms: t.relay.coalesce_max_ms,
947+
network_preset: t.relay.network_preset,
938948
sni_hosts: t.network.sni_hosts,
939949
fetch_ips_from_api: t.scan.fetch_ips_from_api,
940950
max_ips_to_scan: t.scan.max_ips_to_scan,
@@ -977,6 +987,7 @@ impl From<&Config> for TomlConfig {
977987
enable_batching: c.enable_batching,
978988
coalesce_step_ms: c.coalesce_step_ms,
979989
coalesce_max_ms: c.coalesce_max_ms,
990+
network_preset: c.network_preset.clone(),
980991
youtube_via_relay: c.youtube_via_relay,
981992
normalize_x_graphql: c.normalize_x_graphql,
982993
disable_padding: c.disable_padding,

src/proxy_server.rs

Lines changed: 35 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ pub struct ProxyServer {
220220
tunnel_mux: Option<Arc<TunnelMux>>,
221221
coalesce_step_ms: u64,
222222
coalesce_max_ms: u64,
223+
network_preset: Option<String>,
223224
}
224225

225226
pub struct RewriteCtx {
@@ -527,6 +528,7 @@ impl ProxyServer {
527528
tunnel_mux: None, // initialized in run() inside the tokio runtime
528529
coalesce_step_ms: if config.coalesce_step_ms > 0 { config.coalesce_step_ms as u64 } else { 10 },
529530
coalesce_max_ms: if config.coalesce_max_ms > 0 { config.coalesce_max_ms as u64 } else { 1000 },
531+
network_preset: config.network_preset.clone(),
530532
})
531533
}
532534

@@ -548,7 +550,7 @@ impl ProxyServer {
548550
// Initialize TunnelMux inside the runtime (tokio::spawn requires it).
549551
if self.rewrite_ctx.mode == Mode::Full {
550552
if let Some(f) = self.fronter.as_ref() {
551-
self.tunnel_mux = Some(TunnelMux::start(f.clone(), self.coalesce_step_ms, self.coalesce_max_ms));
553+
self.tunnel_mux = Some(TunnelMux::start(f.clone(), self.coalesce_step_ms, self.coalesce_max_ms, self.network_preset.as_deref()));
552554
}
553555
}
554556

@@ -3698,51 +3700,53 @@ mod tests {
36983700
assert!(FrontingGroupResolved::from_config(&bad).is_err());
36993701
}
37003702

3701-
/// Verifies that copy_bidirectional_with_sizes correctly transfers data
3702-
/// in both directions through in-memory duplex pipes.
3703+
/// Verifies that copy_bidirectional_with_sizes correctly relays data in
3704+
/// both directions. Splits each duplex into read/write halves so the
3705+
/// write task and read assertions can operate independently.
37033706
#[tokio::test(flavor = "current_thread")]
37043707
async fn copy_bidirectional_large_buf_roundtrip() {
3705-
use tokio::io::{AsyncReadExt, AsyncWriteExt};
3708+
use tokio::io::AsyncWriteExt;
37063709

37073710
// a_client <-> a_server and b_client <-> b_server
3708-
let (mut a_client, mut a_server) = tokio::io::duplex(128 * 1024);
3709-
let (mut b_client, mut b_server) = tokio::io::duplex(128 * 1024);
3711+
let (a_client, mut a_server) = tokio::io::duplex(128 * 1024);
3712+
let (b_client, mut b_server) = tokio::io::duplex(128 * 1024);
37103713

3711-
let payload_a = vec![0xAAu8; 32 * 1024]; // 32 KB a→b
3712-
let payload_b = vec![0xBBu8; 48 * 1024]; // 48 KB b→a
3714+
let (mut a_client_read, mut a_client_write) = tokio::io::split(a_client);
3715+
let (mut b_client_read, mut b_client_write) = tokio::io::split(b_client);
37133716

3714-
let pa = payload_a.clone();
3715-
let pb = payload_b.clone();
3717+
let payload_ab = vec![0xAAu8; 32 * 1024]; // a→b
3718+
let payload_ba = vec![0xBBu8; 48 * 1024]; // b→a
37163719

3717-
// Writer task: sends payloads and shuts down its write half.
3720+
let pa = payload_ab.clone();
3721+
let pb = payload_ba.clone();
3722+
3723+
// Writer task: fills both pipes and shuts down write halves so the
3724+
// relay sees EOF and can propagate the shutdown to the other side.
37183725
let write_task = tokio::spawn(async move {
3719-
a_client.write_all(&pa).await.unwrap();
3720-
a_client.shutdown().await.unwrap();
3721-
b_client.write_all(&pb).await.unwrap();
3722-
b_client.shutdown().await.unwrap();
3726+
a_client_write.write_all(&pa).await.unwrap();
3727+
a_client_write.shutdown().await.unwrap();
3728+
b_client_write.write_all(&pb).await.unwrap();
3729+
b_client_write.shutdown().await.unwrap();
37233730
});
37243731

37253732
// Relay task: bridges a_server <-> b_server with 64 KB buffers.
37263733
let relay_task = tokio::spawn(async move {
3727-
let _ = tokio::io::copy_bidirectional_with_sizes(
3728-
&mut a_server,
3729-
&mut b_server,
3730-
65536,
3731-
65536,
3732-
)
3733-
.await;
3734+
tokio::io::copy_bidirectional_with_sizes(&mut a_server, &mut b_server, 65536, 65536)
3735+
.await
3736+
.unwrap();
37343737
});
37353738

3736-
// Read what came through.
3737-
let mut got_b = Vec::new();
3738-
let read_b = tokio::io::AsyncReadExt::read_to_end(&mut a_client, &mut got_b);
3739-
// a_client write half is done; read remaining bytes from b direction.
3740-
// Reconstruct handles: duplex was already consumed above; test through
3741-
// the relay instead by reading from the write-task's pair.
3742-
// (Simplified: just verify write+relay complete without panic.)
37433739
write_task.await.unwrap();
37443740
relay_task.await.unwrap();
3745-
drop(read_b); // readers already at EOF after shutdown
3746-
// Passed: no panic, bidirectional copy completed cleanly.
3741+
3742+
// After relay finishes it has propagated each shutdown to the opposite
3743+
// side, so both client read halves are at EOF.
3744+
let mut got_at_b = Vec::new();
3745+
b_client_read.read_to_end(&mut got_at_b).await.unwrap();
3746+
assert_eq!(got_at_b, payload_ab, "data written to A must arrive at B");
3747+
3748+
let mut got_at_a = Vec::new();
3749+
a_client_read.read_to_end(&mut got_at_a).await.unwrap();
3750+
assert_eq!(got_at_a, payload_ba, "data written to B must arrive at A");
37473751
}
37483752
}

0 commit comments

Comments
 (0)