Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 22 additions & 71 deletions DoWhiz_service/run_task_module/src/run_task/pool_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
//! - Scheduler polls completion queue, then calls replenish()

use std::process::Command;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, OnceLock};
use std::sync::{Mutex, OnceLock};
use uuid::Uuid;

const DEFAULT_POOL_SIZE: usize = 5;
Expand Down Expand Up @@ -50,7 +49,6 @@ pub struct PoolConfig {
/// Manages a pool of warm ACI containers.
pub struct PoolManager {
config: PoolConfig,
active_count: Arc<AtomicUsize>,
target_size: usize,
/// Tokio runtime handle captured during initialization for use in sync contexts
runtime_handle: OnceLock<tokio::runtime::Handle>,
Expand All @@ -63,7 +61,6 @@ impl PoolManager {
pub fn new(config: PoolConfig, target_size: Option<usize>) -> Self {
Self {
config,
active_count: Arc::new(AtomicUsize::new(0)),
target_size: target_size.unwrap_or(DEFAULT_POOL_SIZE),
runtime_handle: OnceLock::new(),
replenish_lock: Mutex::new(()),
Expand All @@ -82,8 +79,6 @@ impl PoolManager {

// Count existing warm containers
let existing_count = count_existing_containers(&self.config.resource_group)?;
self.active_count.store(existing_count, Ordering::SeqCst);

let containers_needed = self.target_size.saturating_sub(existing_count);

eprintln!(
Expand All @@ -104,37 +99,29 @@ impl PoolManager {
}));
}

let mut provisioned = 0;
for handle in handles {
match handle.await {
Ok(Ok(name)) => {
self.active_count.fetch_add(1, Ordering::SeqCst);
provisioned += 1;
eprintln!("[pool_manager] Provisioned: {}", name);
}
Ok(Err(e)) => eprintln!("[pool_manager] Provision failed: {}", e),
Err(e) => eprintln!("[pool_manager] Task join failed: {}", e),
}
}

let final_count = self.active_count.load(Ordering::SeqCst);
eprintln!(
"[pool_manager] Pool ready with {} containers (target: {})",
final_count, self.target_size
existing_count + provisioned, self.target_size
);
Ok(())
}

/// Called after a task completes to replenish the pool.
/// Decrements active count and spawns a background provision task.
/// Queries Azure for actual container count to avoid sync issues.
pub fn replenish(&self) {
let previous = self.active_count.fetch_sub(1, Ordering::SeqCst);
eprintln!(
"[pool_manager] Container finished, active: {} -> {}",
previous,
previous - 1
);

// Lock to serialize check-and-reserve, preventing concurrent replenish
// calls from both seeing count < target and over-provisioning
// Lock to serialize replenish calls
let _guard = match self.replenish_lock.lock() {
Ok(guard) => guard,
Err(e) => {
Expand All @@ -143,26 +130,34 @@ impl PoolManager {
}
};

let current = self.active_count.load(Ordering::SeqCst);
// Query Azure for actual container count
let current = match count_existing_containers(&self.config.resource_group) {
Ok(count) => count,
Err(e) => {
eprintln!("[pool_manager] Failed to count containers: {}", e);
return;
}
};

eprintln!(
"[pool_manager] Replenish check: {} containers exist, target is {}",
current, self.target_size
);

if current >= self.target_size {
eprintln!("[pool_manager] Pool at target size, skipping replenish");
return;
}

// Reserve slot before spawning
self.active_count.fetch_add(1, Ordering::SeqCst);

let handle = match self.runtime_handle.get() {
Some(h) => h,
None => {
eprintln!("[pool_manager] No runtime handle available, skipping replenish");
self.active_count.fetch_sub(1, Ordering::SeqCst);
return;
}
};

let config = self.config.clone();
let active_count = Arc::clone(&self.active_count);

drop(_guard); // Release lock before spawning async work

Expand All @@ -173,17 +168,15 @@ impl PoolManager {
eprintln!("[pool_manager] Replenished with: {}", name);
}
Err(e) => {
// Rollback reservation on failure
active_count.fetch_sub(1, Ordering::SeqCst);
eprintln!("[pool_manager] Replenish failed: {}", e);
}
}
});
}

/// Get current number of active containers.
/// Get current number of active containers by querying Azure.
pub fn active_count(&self) -> usize {
self.active_count.load(Ordering::SeqCst)
count_existing_containers(&self.config.resource_group).unwrap_or(0)
}

/// Get target pool size.
Expand Down Expand Up @@ -522,7 +515,6 @@ mod tests {

let manager = PoolManager::new(config, Some(5));
assert_eq!(manager.target_size(), 5);
assert_eq!(manager.active_count(), 0);
assert_eq!(manager.task_queue(), "test-tasks");
assert_eq!(manager.completion_queue(), "test-completions");
}
Expand Down Expand Up @@ -660,45 +652,4 @@ mod tests {
assert_eq!(CONTAINER_PREFIX, "dwz-warm-");
}

#[test]
fn test_pool_manager_active_count_starts_at_zero() {
let config = test_config();
let manager = PoolManager::new(config, Some(5));

// Before initialize, active_count should be 0
assert_eq!(manager.active_count(), 0);
}

#[test]
fn test_pool_manager_replenish_decrements_count() {
let config = test_config();
let manager = PoolManager::new(config, Some(5));

// Manually set active count to simulate initialized state
manager.active_count.store(5, Ordering::SeqCst);
assert_eq!(manager.active_count(), 5);

// replenish() decrements the count (note: won't spawn without runtime handle)
manager.replenish();
assert_eq!(manager.active_count(), 4);

manager.replenish();
assert_eq!(manager.active_count(), 3);
}

#[test]
fn test_pool_manager_replenish_skips_when_at_target() {
let config = test_config();
let manager = PoolManager::new(config, Some(3));

// Set active count above target
manager.active_count.store(5, Ordering::SeqCst);

// replenish decrements but logs "skipping" since still >= target
manager.replenish(); // 5 -> 4, still >= 3
assert_eq!(manager.active_count(), 4);

manager.replenish(); // 4 -> 3, still >= 3
assert_eq!(manager.active_count(), 3);
}
}
Loading