@@ -1403,6 +1403,8 @@ struct InflightMeta {
14031403struct InflightEntry {
14041404 meta : InflightMeta ,
14051405 reply_rx : oneshot:: Receiver < Result < ( TunnelResponse , String ) , String > > ,
1406+ /// Held until reply is processed — releases upload flow control permit.
1407+ _upload_permit : Option < tokio:: sync:: OwnedSemaphorePermit > ,
14061408}
14071409
14081410/// Upload task: reads from the client socket, accumulates data (50ms
@@ -1421,6 +1423,7 @@ async fn upload_task(
14211423 eof_seen : Arc < AtomicBool > ,
14221424 inflight_tx : mpsc:: UnboundedSender < InflightEntry > ,
14231425 initial_data : Option < Bytes > ,
1426+ upload_sem : Arc < Semaphore > ,
14241427) {
14251428 const READ_CHUNK : usize = 512 * 1024 ;
14261429 let mut buf = vec ! [ 0u8 ; READ_CHUNK ] ;
@@ -1450,6 +1453,7 @@ async fn upload_task(
14501453 let entry = InflightEntry {
14511454 meta : InflightMeta { seq, was_empty_poll : false , send_at } ,
14521455 reply_rx,
1456+ _upload_permit : None , // initial data — no flow control
14531457 } ;
14541458 if inflight_tx. send ( entry) . is_err ( ) {
14551459 return ; // download task gone
@@ -1509,6 +1513,9 @@ async fn upload_task(
15091513
15101514 if data. is_empty ( ) { continue ; }
15111515
1516+ // Flow control: wait for a permit before sending.
1517+ let permit = upload_sem. clone ( ) . acquire_owned ( ) . await . unwrap ( ) ;
1518+
15121519 let seq = next_send_seq. fetch_add ( 1 , Ordering :: Relaxed ) ;
15131520 let wseq = next_data_write_seq;
15141521 next_data_write_seq += 1 ;
@@ -1528,6 +1535,7 @@ async fn upload_task(
15281535 let entry = InflightEntry {
15291536 meta : InflightMeta { seq, was_empty_poll : false , send_at } ,
15301537 reply_rx,
1538+ _upload_permit : Some ( permit) ,
15311539 } ;
15321540 if inflight_tx. send ( entry) . is_err ( ) {
15331541 break ; // download task gone
@@ -1558,6 +1566,7 @@ async fn tunnel_loop(
15581566 // the mux, and forwards InflightEntry to the download task.
15591567 // Pending client data is seeded as the first send inside the upload
15601568 // task via an initial_data parameter.
1569+ let upload_sem = Arc :: new ( Semaphore :: new ( 3 ) ) ; // max 3 unacked upload ops
15611570 let _upload_handle = tokio:: spawn ( upload_task (
15621571 reader,
15631572 sid. to_string ( ) ,
@@ -1567,6 +1576,7 @@ async fn tunnel_loop(
15671576 eof_seen. clone ( ) ,
15681577 inflight_tx, // move the only sender to the upload task
15691578 pending_client_data. clone ( ) ,
1579+ upload_sem,
15701580 ) ) ;
15711581 // The download task does NOT hold an inflight_tx clone — when the
15721582 // upload task exits and drops the sender, inflight_rx.recv() returns
@@ -1593,14 +1603,20 @@ async fn tunnel_loop(
15931603 let mut inflight: FuturesUnordered < ReplyFut > = FuturesUnordered :: new ( ) ;
15941604
15951605 // Helper: wrap a reply_rx into a ReplyFut with timeout.
1596- fn wrap_reply ( meta : InflightMeta , reply_rx : oneshot:: Receiver < Result < ( TunnelResponse , String ) , String > > ) -> std:: pin:: Pin < Box < dyn std:: future:: Future < Output = ( InflightMeta , ReplyOutcome ) > + Send > > {
1606+ fn wrap_reply (
1607+ meta : InflightMeta ,
1608+ reply_rx : oneshot:: Receiver < Result < ( TunnelResponse , String ) , String > > ,
1609+ permit : Option < tokio:: sync:: OwnedSemaphorePermit > ,
1610+ ) -> std:: pin:: Pin < Box < dyn std:: future:: Future < Output = ( InflightMeta , ReplyOutcome ) > + Send > > {
15971611 Box :: pin ( async move {
1598- match tokio:: time:: timeout ( REPLY_TIMEOUT , reply_rx) . await {
1612+ let result = match tokio:: time:: timeout ( REPLY_TIMEOUT , reply_rx) . await {
15991613 Ok ( Ok ( Ok ( ( r, sid) ) ) ) => ( meta, ReplyOutcome :: Ok ( r, sid) ) ,
16001614 Ok ( Ok ( Err ( e) ) ) => ( meta, ReplyOutcome :: BatchErr ( e) ) ,
16011615 Ok ( Err ( _) ) => ( meta, ReplyOutcome :: Dropped ) ,
16021616 Err ( _) => ( meta, ReplyOutcome :: Timeout ) ,
1603- }
1617+ } ;
1618+ drop ( permit) ; // release upload flow control permit AFTER reply
1619+ result
16041620 } )
16051621 }
16061622
@@ -1649,7 +1665,7 @@ async fn tunnel_loop(
16491665 & sid[ ..sid. len( ) . min( 8 ) ] ,
16501666 entry. meta. seq,
16511667 ) ;
1652- inflight. push ( wrap_reply ( entry. meta , entry. reply_rx ) ) ;
1668+ inflight. push ( wrap_reply ( entry. meta , entry. reply_rx , entry . _upload_permit ) ) ;
16531669 }
16541670 None => {
16551671 // Upload task exited before sending — nothing to do.
@@ -1676,13 +1692,14 @@ async fn tunnel_loop(
16761692 meta. seq,
16771693 inflight. len( ) + 1 ,
16781694 ) ;
1679- inflight. push ( wrap_reply ( meta, reply_rx) ) ;
1695+ inflight. push ( wrap_reply ( meta, reply_rx, None ) ) ;
16801696 }
16811697 }
16821698
16831699 // Timer for staggered refill polls — fires in the select, never blocks.
16841700 let mut refill_at: Option < std:: pin:: Pin < Box < tokio:: time:: Sleep > > > = None ;
1685- let mut refill_steps: u32 = 0 ; // counts 100ms steps; poll after 10 (1s)
1701+ let mut refill_steps: u32 = 0 ;
1702+ let mut data_ops_in_flight: u32 = 0 ;
16861703
16871704 // Schedule initial refill if pre-fill didn't fill all slots.
16881705 if inflight. len ( ) < max_inflight {
@@ -1744,7 +1761,7 @@ async fn tunnel_loop(
17441761 & sid[ ..sid. len( ) . min( 8 ) ] ,
17451762 entry. meta. seq,
17461763 ) ;
1747- inflight. push ( wrap_reply ( entry. meta , entry. reply_rx ) ) ;
1764+ inflight. push ( wrap_reply ( entry. meta , entry. reply_rx , entry . _upload_permit ) ) ;
17481765 continue ;
17491766 }
17501767 None => {
@@ -1759,16 +1776,17 @@ async fn tunnel_loop(
17591776 tracing:: debug!(
17601777 "sess {}: keepalive poll seq={}" , & sid[ ..sid. len( ) . min( 8 ) ] , meta. seq
17611778 ) ;
1762- inflight. push ( wrap_reply ( meta, reply_rx) ) ;
1779+ inflight. push ( wrap_reply ( meta, reply_rx, None ) ) ;
17631780 }
17641781
17651782 // Drain any InflightEntry items from the upload task before
17661783 // entering the select — non-blocking.
17671784 while let Ok ( entry) = inflight_rx. try_recv ( ) {
17681785 if !entry. meta . was_empty_poll {
17691786 consecutive_empty = 0 ;
1787+ data_ops_in_flight += 1 ;
17701788 }
1771- inflight. push ( wrap_reply ( entry. meta , entry. reply_rx ) ) ;
1789+ inflight. push ( wrap_reply ( entry. meta , entry. reply_rx , entry . _upload_permit ) ) ;
17721790 }
17731791
17741792 tokio:: select! {
@@ -1778,13 +1796,14 @@ async fn tunnel_loop(
17781796 Some ( entry) = inflight_rx. recv( ) => {
17791797 if !entry. meta. was_empty_poll {
17801798 consecutive_empty = 0 ;
1799+ data_ops_in_flight += 1 ;
17811800 }
1782- inflight. push( wrap_reply( entry. meta, entry. reply_rx) ) ;
1801+ inflight. push( wrap_reply( entry. meta, entry. reply_rx, entry . _upload_permit ) ) ;
17831802 }
17841803
17851804 // Refill timer: 100ms steps, send empty poll after 10 steps
17861805 // (1s) for batch separation.
1787- _ = async { refill_at. as_mut( ) . unwrap( ) . await } , if refill_at. is_some( ) => {
1806+ _ = async { refill_at. as_mut( ) . unwrap( ) . await } , if refill_at. is_some( ) && data_ops_in_flight == 0 => {
17881807 refill_at = None ;
17891808 if !eof_seen. load( Ordering :: Relaxed )
17901809 && inflight. len( ) < max_inflight
@@ -1794,7 +1813,7 @@ async fn tunnel_loop(
17941813 if refill_steps >= 10 {
17951814 let ( meta, reply_rx, send_fut) = send_empty_poll( sid, & next_send_seq, mux) ;
17961815 send_fut. await ;
1797- inflight. push( wrap_reply( meta, reply_rx) ) ;
1816+ inflight. push( wrap_reply( meta, reply_rx, None ) ) ;
17981817 refill_steps = 0 ;
17991818
18001819 if inflight. len( ) < max_inflight && max_inflight > INFLIGHT_IDLE {
@@ -1810,6 +1829,9 @@ async fn tunnel_loop(
18101829 Some ( ( meta, outcome) ) = inflight. next( ) => {
18111830 match outcome {
18121831 ReplyOutcome :: Ok ( resp, script_id) => {
1832+ if !meta. was_empty_poll {
1833+ data_ops_in_flight = data_ops_in_flight. saturating_sub( 1 ) ;
1834+ }
18131835 let has_data = resp. d. as_ref( ) . map( |d| !d. is_empty( ) ) . unwrap_or( false ) ;
18141836 tracing:: debug!(
18151837 "sess {}: recv seq={}, rtt={:?}, data={}, inflight={}" ,
@@ -1933,7 +1955,7 @@ async fn tunnel_loop(
19331955 // Schedule refill if pipeline needs more polls.
19341956 if !eof_seen. load( Ordering :: Relaxed )
19351957 && inflight. len( ) < max_inflight
1936- // consecutive_empty gate removed — keep polling
1958+ && data_ops_in_flight == 0
19371959 && refill_at. is_none( )
19381960 {
19391961 refill_at = Some ( Box :: pin( tokio:: time:: sleep(
0 commit comments