Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 159 additions & 4 deletions crates/mofa-foundation/src/scheduler/deferred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//! - Dequeues oldest-first among requests that fit in available memory

use std::collections::VecDeque;
use std::time::Instant;
use std::time::{Duration, Instant};

/// A request that was deferred due to memory pressure.
#[derive(Debug, Clone)]
Expand All @@ -20,16 +20,25 @@ pub struct DeferredRequest {
pub enqueued_at: Instant,
/// Number of retry attempts so far.
pub retry_count: u32,
/// Time-to-live: request expires after this duration if not processed.
/// Prevents "ghost" requests from filling the queue when callers disconnect.
pub ttl: Duration,
}

impl DeferredRequest {
/// Create a new deferred request.
/// Create a new deferred request with default TTL (5 minutes).
pub fn new(id: String, required_mb: u64) -> Self {
Self::with_ttl(id, required_mb, Duration::from_secs(300))
}

/// Create a new deferred request with custom TTL.
pub fn with_ttl(id: String, required_mb: u64, ttl: Duration) -> Self {
Self {
id,
required_mb,
enqueued_at: Instant::now(),
retry_count: 0,
ttl,
}
}

Expand All @@ -38,13 +47,23 @@ impl DeferredRequest {
self.enqueued_at.elapsed().as_millis() as u64
}

/// Check if this request has exceeded its TTL.
pub fn is_expired(&self) -> bool {
self.enqueued_at.elapsed() > self.ttl
}

/// Increment the retry counter.
pub fn increment_retry(&mut self) {
self.retry_count += 1;
}
}

/// Age-aware deferred queue with capacity limits.
/// Age-aware deferred queue with capacity limits and TTL-based expiry.
///
/// Prevents "ghost" requests from filling the queue when callers disconnect:
/// - Requests can be explicitly canceled with `cancel_request(id)`
/// - Requests automatically expire after their TTL (default 5 minutes)
/// - Both mechanisms prevent DoS via queue saturation
#[derive(Debug)]
pub struct DeferredQueue {
queue: VecDeque<DeferredRequest>,
Expand All @@ -71,16 +90,37 @@ impl DeferredQueue {
true
}

/// Cancel a specific request by ID and remove it from the queue.
///
/// Prevents "ghost" requests from occupied callers (e.g., disconnected clients)
/// from blocking the queue.
///
/// Returns `true` if a request was removed, `false` if not found.
pub fn cancel_request(&mut self, request_id: &str) -> bool {
if let Some(pos) = self.queue.iter().position(|r| r.id == request_id) {
self.queue.remove(pos).is_some()
} else {
false
}
}

/// Dequeue the **oldest** request that fits in `available_mb`.
///
/// This is the fairness policy: we scan from oldest to newest and
/// take the first request whose memory requirement is satisfied,
/// preventing starvation of small requests behind large ones.
///
/// Skips requests that have exceeded their TTL or max retries.
pub fn dequeue_oldest_fitting(&mut self, available_mb: u64) -> Option<DeferredRequest> {
let mut best_idx: Option<usize> = None;

for (i, req) in self.queue.iter().enumerate() {
if req.required_mb <= available_mb && req.retry_count < self.max_retries {
// Skip expired requests (either by TTL or max retries)
if req.is_expired() || req.retry_count >= self.max_retries {
continue;
}

if req.required_mb <= available_mb {
best_idx = Some(i);
break; // oldest first
}
Expand All @@ -102,6 +142,22 @@ impl DeferredQueue {
expired
}

/// Remove and return all requests that have exceeded their TTL.
///
/// Automatically called periodically by the scheduler to prevent
/// "ghost" requests from filling the queue when callers disconnect.
pub fn drain_ttl_expired(&mut self) -> Vec<DeferredRequest> {
let expired: Vec<DeferredRequest> = self
.queue
.iter()
.filter(|r| r.is_expired())
.cloned()
.collect();

self.queue.retain(|r| !r.is_expired());
expired
}

/// Current number of requests in the queue.
pub fn len(&self) -> usize {
self.queue.len()
Expand Down Expand Up @@ -182,4 +238,103 @@ mod tests {
let result = q.dequeue_oldest_fitting(10_000);
assert!(result.is_none());
}

#[test]
fn test_cancel_request() {
let mut q = DeferredQueue::new(10, 3);
q.enqueue(DeferredRequest::new("r1".into(), 100));
q.enqueue(DeferredRequest::new("r2".into(), 100));
assert_eq!(q.len(), 2);

// Cancel a request that exists
assert!(q.cancel_request("r1"));
assert_eq!(q.len(), 1);

// Try to cancel non-existent request
assert!(!q.cancel_request("r3"));
assert_eq!(q.len(), 1);

// Verify r2 is still there
let remaining = q.dequeue_oldest_fitting(10_000);
assert_eq!(remaining.unwrap().id, "r2");
}

#[test]
fn test_ttl_expiry() {
let mut q = DeferredQueue::new(10, 3);

// Create request with very short TTL (1ms)
let req = DeferredRequest::with_ttl("r1".into(), 100, Duration::from_millis(1));
q.enqueue(req);

// Immediately after enqueue, should not be expired
assert!(!q.dequeue_oldest_fitting(10_000).is_none());

// Create new request with short TTL and wait
let req2 = DeferredRequest::with_ttl("r2".into(), 100, Duration::from_millis(10));
q.enqueue(req2);

// Wait for TTL to expire
std::thread::sleep(std::time::Duration::from_millis(50));

// Request should now be expired and not dequeued
assert!(q.dequeue_oldest_fitting(10_000).is_none());

// But drain_ttl_expired should find it
let expired = q.drain_ttl_expired();
assert_eq!(expired.len(), 1);
assert_eq!(expired[0].id, "r2");
}

#[test]
fn test_drain_ttl_expired() {
let mut q = DeferredQueue::new(10, 3);

// Add request with long TTL
q.enqueue(DeferredRequest::with_ttl(
"r1".into(),
100,
Duration::from_secs(300),
));

// Add request with short TTL
q.enqueue(DeferredRequest::with_ttl(
"r2".into(),
100,
Duration::from_millis(1),
));

// Wait for short TTL to expire
std::thread::sleep(std::time::Duration::from_millis(50));

// Only r2 should be expired
let expired = q.drain_ttl_expired();
assert_eq!(expired.len(), 1);
assert_eq!(expired[0].id, "r2");
assert_eq!(q.len(), 1); // r1 still in queue
}

#[test]
fn test_ghost_request_prevention() {
// Simulate the fix #1569 scenario:
// 1. Submit requests that exceed defer_threshold
// 2. Callers disconnect (simulate with cancel_request)
// 3. New legitimate requests should still enqueue
let mut q = DeferredQueue::new(100, 10);

// Fill with "ghost" requests from disconnected callers
for i in 0..50 {
q.enqueue(DeferredRequest::new(format!("ghost-{}", i), 100));
}
assert_eq!(q.len(), 50);

// Callers disconnect - we cancel these "ghost" requests
for i in 0..50 {
assert!(q.cancel_request(&format!("ghost-{}", i)));
}

// Queue should now be empty, allowing new legitimate requests
assert!(q.is_empty());
assert!(q.enqueue(DeferredRequest::new("legitimate".into(), 100)));
}
}
Loading