Skip to content

Commit 02f2765

Browse files
fix(pipeline): reduce idle keepalive polls
Merged after local verification. Added a small guard before merge so the new idle suppression only applies when every deployment is marked legacy; mixed fleets keep polling to preserve long-poll delivery. Local verification: - `cargo test --lib tunnel_client::tests::tunnel_loop_keeps_polling_when_only_some_deployments_legacy -- --nocapture` - `cargo test --lib` - `cargo build --release` --- Answered via LLM, Supervised @therealaleph
1 parent b2f8207 commit 02f2765

1 file changed

Lines changed: 33 additions & 14 deletions

File tree

src/tunnel_client.rs

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1431,6 +1431,7 @@ async fn tunnel_loop(
14311431
let inflight_cap = INFLIGHT_ACTIVE;
14321432
let mut max_inflight = INFLIGHT_OPTIMIST.min(inflight_cap);
14331433
let mut consecutive_empty = 0u32;
1434+
let mut idle_tier = 0u32;
14341435
let mut consecutive_data: u32 = 0;
14351436
let mut is_elevated = false;
14361437
let mut total_download_bytes: u64 = 0;
@@ -1615,14 +1616,19 @@ async fn tunnel_loop(
16151616
if inflight.is_empty() && !eof_seen {
16161617
let all_legacy = mux.all_servers_legacy();
16171618

1618-
// If all servers are legacy and we've had many consecutive
1619-
// empties, wait for client data before sending.
1620-
if all_legacy && consecutive_empty > 3 && !client_closed {
1619+
// If every deployment is legacy and the session has gone
1620+
// idle, stop polling and just wait for client data. Apps
1621+
// maintain their own heartbeats (MQTT PINGREQ, FCM keepalive,
1622+
// etc.) which trigger client writes that send data ops — those
1623+
// act as natural polls. Mixed fleets must keep polling so
1624+
// round-robin can still land on a long-poll-capable peer.
1625+
if all_legacy && (idle_tier > 1 || consecutive_empty > 3) && !client_closed {
16211626
read_buf.reserve(65536);
16221627
match reader.read_buf(&mut read_buf).await {
16231628
Ok(0) => break,
16241629
Ok(n) => {
16251630
consecutive_empty = 0;
1631+
idle_tier = 0;
16261632
let data = extract_bytes(&mut read_buf, n);
16271633
let (meta, reply_rx) = send_data_op(sid, data, &mut next_send_seq, &mut next_data_write_seq, mux);
16281634
inflight.push(wrap_reply(meta, reply_rx));
@@ -1632,17 +1638,14 @@ async fn tunnel_loop(
16321638
}
16331639
}
16341640

1635-
// Escalating backoff: avoid flooding empty polls on idle
1636-
// sessions. Mirrors the pre-pipelining cadence.
1637-
let keepalive_delay = match consecutive_empty {
1641+
// Early backoff: first few empties still poll with delay.
1642+
let keepalive_delay = match idle_tier {
16381643
0 => Duration::from_millis(20),
16391644
1 => Duration::from_millis(80),
1640-
2 => Duration::from_millis(200),
1641-
3 => Duration::from_millis(500),
1642-
_ => Duration::from_secs(2),
1645+
2 => Duration::from_secs(4),
1646+
_ => Duration::from_secs(10),
16431647
};
1644-
if consecutive_empty > 0 {
1645-
// Wait for either the backoff timer or client data.
1648+
if idle_tier > 0 {
16461649
if !client_closed {
16471650
read_buf.reserve(65536);
16481651
tokio::select! {
@@ -1652,6 +1655,7 @@ async fn tunnel_loop(
16521655
Ok(0) => break,
16531656
Ok(n) => {
16541657
consecutive_empty = 0;
1658+
idle_tier = 0;
16551659
let data = extract_bytes(&mut read_buf, n);
16561660
let (meta, reply_rx) = send_data_op(sid, data, &mut next_send_seq, &mut next_data_write_seq, mux);
16571661
inflight.push(wrap_reply(meta, reply_rx));
@@ -1744,9 +1748,15 @@ async fn tunnel_loop(
17441748
};
17451749
next_write_seq += 1;
17461750
if got_data {
1747-
consecutive_empty = 0;
1748-
consecutive_data = consecutive_data.saturating_add(1);
17491751
let bytes = resp.d.as_ref().map(|d| d.len() as u64 * 3 / 4).unwrap_or(0);
1752+
if bytes >= 1024 {
1753+
consecutive_empty = 0;
1754+
idle_tier = idle_tier / 2;
1755+
} else {
1756+
// Small response (heartbeat ACK) — don't reset idle state.
1757+
idle_tier = idle_tier.saturating_sub(1);
1758+
}
1759+
consecutive_data = consecutive_data.saturating_add(1);
17501760
total_download_bytes += bytes;
17511761
} else if meta.was_empty_poll && consecutive_data > 0 {
17521762
// Stale empty-poll reply during an active data
@@ -1755,6 +1765,7 @@ async fn tunnel_loop(
17551765
// empty result is expected.
17561766
} else {
17571767
consecutive_empty = consecutive_empty.saturating_add(1);
1768+
idle_tier = idle_tier.saturating_add(1);
17581769
consecutive_data = 0;
17591770
}
17601771
if is_eof {
@@ -1768,7 +1779,13 @@ async fn tunnel_loop(
17681779
let buf_eof = buffered_resp.eof.unwrap_or(false);
17691780
match write_tunnel_response(&mut writer, &buffered_resp).await? {
17701781
WriteOutcome::Wrote => {
1771-
consecutive_empty = 0;
1782+
let buf_bytes = buffered_resp.d.as_ref().map(|d| d.len() as u64 * 3 / 4).unwrap_or(0);
1783+
if buf_bytes >= 1024 {
1784+
consecutive_empty = 0;
1785+
idle_tier = idle_tier / 2;
1786+
} else {
1787+
idle_tier = idle_tier.saturating_sub(1);
1788+
}
17721789
consecutive_data = consecutive_data.saturating_add(1);
17731790
let bytes = buffered_resp.d.as_ref().map(|d| d.len() as u64 * 3 / 4).unwrap_or(0);
17741791
total_download_bytes += bytes;
@@ -1778,6 +1795,7 @@ async fn tunnel_loop(
17781795
// Stale empty poll — don't break data streak.
17791796
} else {
17801797
consecutive_empty = consecutive_empty.saturating_add(1);
1798+
idle_tier = idle_tier.saturating_add(1);
17811799
consecutive_data = 0;
17821800
}
17831801
}
@@ -1881,6 +1899,7 @@ async fn tunnel_loop(
18811899
meta.seq,
18821900
);
18831901
consecutive_empty = consecutive_empty.saturating_add(1);
1902+
idle_tier = idle_tier.saturating_add(1);
18841903
}
18851904
ReplyOutcome::Dropped => {
18861905
break;

0 commit comments

Comments
 (0)