diff --git a/DoWhiz_service/run_task_module/src/run_task/pool_manager.rs b/DoWhiz_service/run_task_module/src/run_task/pool_manager.rs index 938fdf18..c2ec9e85 100644 --- a/DoWhiz_service/run_task_module/src/run_task/pool_manager.rs +++ b/DoWhiz_service/run_task_module/src/run_task/pool_manager.rs @@ -11,8 +11,7 @@ //! - Scheduler polls completion queue, then calls replenish() use std::process::Command; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex, OnceLock}; +use std::sync::{Mutex, OnceLock}; use uuid::Uuid; const DEFAULT_POOL_SIZE: usize = 5; @@ -50,7 +49,6 @@ pub struct PoolConfig { /// Manages a pool of warm ACI containers. pub struct PoolManager { config: PoolConfig, - active_count: Arc, target_size: usize, /// Tokio runtime handle captured during initialization for use in sync contexts runtime_handle: OnceLock, @@ -63,7 +61,6 @@ impl PoolManager { pub fn new(config: PoolConfig, target_size: Option) -> Self { Self { config, - active_count: Arc::new(AtomicUsize::new(0)), target_size: target_size.unwrap_or(DEFAULT_POOL_SIZE), runtime_handle: OnceLock::new(), replenish_lock: Mutex::new(()), @@ -82,8 +79,6 @@ impl PoolManager { // Count existing warm containers let existing_count = count_existing_containers(&self.config.resource_group)?; - self.active_count.store(existing_count, Ordering::SeqCst); - let containers_needed = self.target_size.saturating_sub(existing_count); eprintln!( @@ -104,10 +99,11 @@ impl PoolManager { })); } + let mut provisioned = 0; for handle in handles { match handle.await { Ok(Ok(name)) => { - self.active_count.fetch_add(1, Ordering::SeqCst); + provisioned += 1; eprintln!("[pool_manager] Provisioned: {}", name); } Ok(Err(e)) => eprintln!("[pool_manager] Provision failed: {}", e), @@ -115,26 +111,17 @@ impl PoolManager { } } - let final_count = self.active_count.load(Ordering::SeqCst); eprintln!( "[pool_manager] Pool ready with {} containers (target: {})", - final_count, self.target_size + existing_count + provisioned, self.target_size ); Ok(()) } /// Called after a task completes to replenish the pool. - /// Decrements active count and spawns a background provision task. + /// Queries Azure for actual container count to avoid sync issues. pub fn replenish(&self) { - let previous = self.active_count.fetch_sub(1, Ordering::SeqCst); - eprintln!( - "[pool_manager] Container finished, active: {} -> {}", - previous, - previous - 1 - ); - - // Lock to serialize check-and-reserve, preventing concurrent replenish - // calls from both seeing count < target and over-provisioning + // Lock to serialize replenish calls let _guard = match self.replenish_lock.lock() { Ok(guard) => guard, Err(e) => { @@ -143,26 +130,34 @@ impl PoolManager { } }; - let current = self.active_count.load(Ordering::SeqCst); + // Query Azure for actual container count + let current = match count_existing_containers(&self.config.resource_group) { + Ok(count) => count, + Err(e) => { + eprintln!("[pool_manager] Failed to count containers: {}", e); + return; + } + }; + + eprintln!( + "[pool_manager] Replenish check: {} containers exist, target is {}", + current, self.target_size + ); + if current >= self.target_size { eprintln!("[pool_manager] Pool at target size, skipping replenish"); return; } - // Reserve slot before spawning - self.active_count.fetch_add(1, Ordering::SeqCst); - let handle = match self.runtime_handle.get() { Some(h) => h, None => { eprintln!("[pool_manager] No runtime handle available, skipping replenish"); - self.active_count.fetch_sub(1, Ordering::SeqCst); return; } }; let config = self.config.clone(); - let active_count = Arc::clone(&self.active_count); drop(_guard); // Release lock before spawning async work @@ -173,17 +168,15 @@ impl PoolManager { eprintln!("[pool_manager] Replenished with: {}", name); } Err(e) => { - // Rollback reservation on failure - active_count.fetch_sub(1, Ordering::SeqCst); eprintln!("[pool_manager] Replenish failed: {}", e); } } }); } - /// Get current number of active containers. + /// Get current number of active containers by querying Azure. pub fn active_count(&self) -> usize { - self.active_count.load(Ordering::SeqCst) + count_existing_containers(&self.config.resource_group).unwrap_or(0) } /// Get target pool size. @@ -522,7 +515,6 @@ mod tests { let manager = PoolManager::new(config, Some(5)); assert_eq!(manager.target_size(), 5); - assert_eq!(manager.active_count(), 0); assert_eq!(manager.task_queue(), "test-tasks"); assert_eq!(manager.completion_queue(), "test-completions"); } @@ -660,45 +652,4 @@ mod tests { assert_eq!(CONTAINER_PREFIX, "dwz-warm-"); } - #[test] - fn test_pool_manager_active_count_starts_at_zero() { - let config = test_config(); - let manager = PoolManager::new(config, Some(5)); - - // Before initialize, active_count should be 0 - assert_eq!(manager.active_count(), 0); - } - - #[test] - fn test_pool_manager_replenish_decrements_count() { - let config = test_config(); - let manager = PoolManager::new(config, Some(5)); - - // Manually set active count to simulate initialized state - manager.active_count.store(5, Ordering::SeqCst); - assert_eq!(manager.active_count(), 5); - - // replenish() decrements the count (note: won't spawn without runtime handle) - manager.replenish(); - assert_eq!(manager.active_count(), 4); - - manager.replenish(); - assert_eq!(manager.active_count(), 3); - } - - #[test] - fn test_pool_manager_replenish_skips_when_at_target() { - let config = test_config(); - let manager = PoolManager::new(config, Some(3)); - - // Set active count above target - manager.active_count.store(5, Ordering::SeqCst); - - // replenish decrements but logs "skipping" since still >= target - manager.replenish(); // 5 -> 4, still >= 3 - assert_eq!(manager.active_count(), 4); - - manager.replenish(); // 4 -> 3, still >= 3 - assert_eq!(manager.active_count(), 3); - } }