Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .agents/skills/scheduler_maintain/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ SCHEDULER_ACTIONS_JSON_END
## Rules
- Use RFC3339 UTC timestamps.
- Cron uses 6 fields: `sec min hour day month weekday`.
- Prefer named weekdays like `MON-FRI` for recurring weekday schedules. Do not use numeric
weekday ranges for Monday-Friday here; in this scheduler parser, `1-5` is ambiguous and can
behave like Sunday-Thursday.
- Do not include workspace paths; `create_run_task` always targets the current workspace.
- Output only JSON inside blocks; no commentary inside blocks.
- If no changes are requested, omit the relevant block.
93 changes: 22 additions & 71 deletions DoWhiz_service/run_task_module/src/run_task/pool_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
//! - Scheduler polls completion queue, then calls replenish()

use std::process::Command;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, OnceLock};
use std::sync::{Mutex, OnceLock};
use uuid::Uuid;

const DEFAULT_POOL_SIZE: usize = 5;
Expand Down Expand Up @@ -50,7 +49,6 @@ pub struct PoolConfig {
/// Manages a pool of warm ACI containers.
pub struct PoolManager {
config: PoolConfig,
active_count: Arc<AtomicUsize>,
target_size: usize,
/// Tokio runtime handle captured during initialization for use in sync contexts
runtime_handle: OnceLock<tokio::runtime::Handle>,
Expand All @@ -63,7 +61,6 @@ impl PoolManager {
pub fn new(config: PoolConfig, target_size: Option<usize>) -> Self {
Self {
config,
active_count: Arc::new(AtomicUsize::new(0)),
target_size: target_size.unwrap_or(DEFAULT_POOL_SIZE),
runtime_handle: OnceLock::new(),
replenish_lock: Mutex::new(()),
Expand All @@ -82,8 +79,6 @@ impl PoolManager {

// Count existing warm containers
let existing_count = count_existing_containers(&self.config.resource_group)?;
self.active_count.store(existing_count, Ordering::SeqCst);

let containers_needed = self.target_size.saturating_sub(existing_count);

eprintln!(
Expand All @@ -104,37 +99,29 @@ impl PoolManager {
}));
}

let mut provisioned = 0;
for handle in handles {
match handle.await {
Ok(Ok(name)) => {
self.active_count.fetch_add(1, Ordering::SeqCst);
provisioned += 1;
eprintln!("[pool_manager] Provisioned: {}", name);
}
Ok(Err(e)) => eprintln!("[pool_manager] Provision failed: {}", e),
Err(e) => eprintln!("[pool_manager] Task join failed: {}", e),
}
}

let final_count = self.active_count.load(Ordering::SeqCst);
eprintln!(
"[pool_manager] Pool ready with {} containers (target: {})",
final_count, self.target_size
existing_count + provisioned, self.target_size
);
Ok(())
}

/// Called after a task completes to replenish the pool.
/// Decrements active count and spawns a background provision task.
/// Queries Azure for actual container count to avoid sync issues.
pub fn replenish(&self) {
let previous = self.active_count.fetch_sub(1, Ordering::SeqCst);
eprintln!(
"[pool_manager] Container finished, active: {} -> {}",
previous,
previous - 1
);

// Lock to serialize check-and-reserve, preventing concurrent replenish
// calls from both seeing count < target and over-provisioning
// Lock to serialize replenish calls
let _guard = match self.replenish_lock.lock() {
Ok(guard) => guard,
Err(e) => {
Expand All @@ -143,26 +130,34 @@ impl PoolManager {
}
};

let current = self.active_count.load(Ordering::SeqCst);
// Query Azure for actual container count
let current = match count_existing_containers(&self.config.resource_group) {
Ok(count) => count,
Err(e) => {
eprintln!("[pool_manager] Failed to count containers: {}", e);
return;
}
};

eprintln!(
"[pool_manager] Replenish check: {} containers exist, target is {}",
current, self.target_size
);

if current >= self.target_size {
eprintln!("[pool_manager] Pool at target size, skipping replenish");
return;
}

// Reserve slot before spawning
self.active_count.fetch_add(1, Ordering::SeqCst);

let handle = match self.runtime_handle.get() {
Some(h) => h,
None => {
eprintln!("[pool_manager] No runtime handle available, skipping replenish");
self.active_count.fetch_sub(1, Ordering::SeqCst);
return;
}
};

let config = self.config.clone();
let active_count = Arc::clone(&self.active_count);

drop(_guard); // Release lock before spawning async work

Expand All @@ -173,17 +168,15 @@ impl PoolManager {
eprintln!("[pool_manager] Replenished with: {}", name);
}
Err(e) => {
// Rollback reservation on failure
active_count.fetch_sub(1, Ordering::SeqCst);
eprintln!("[pool_manager] Replenish failed: {}", e);
}
}
});
}

/// Get current number of active containers.
/// Get current number of active containers by querying Azure.
pub fn active_count(&self) -> usize {
self.active_count.load(Ordering::SeqCst)
count_existing_containers(&self.config.resource_group).unwrap_or(0)
}

/// Get target pool size.
Expand Down Expand Up @@ -522,7 +515,6 @@ mod tests {

let manager = PoolManager::new(config, Some(5));
assert_eq!(manager.target_size(), 5);
assert_eq!(manager.active_count(), 0);
assert_eq!(manager.task_queue(), "test-tasks");
assert_eq!(manager.completion_queue(), "test-completions");
}
Expand Down Expand Up @@ -660,45 +652,4 @@ mod tests {
assert_eq!(CONTAINER_PREFIX, "dwz-warm-");
}

#[test]
fn test_pool_manager_active_count_starts_at_zero() {
let config = test_config();
let manager = PoolManager::new(config, Some(5));

// Before initialize, active_count should be 0
assert_eq!(manager.active_count(), 0);
}

#[test]
fn test_pool_manager_replenish_decrements_count() {
let config = test_config();
let manager = PoolManager::new(config, Some(5));

// Manually set active count to simulate initialized state
manager.active_count.store(5, Ordering::SeqCst);
assert_eq!(manager.active_count(), 5);

// replenish() decrements the count (note: won't spawn without runtime handle)
manager.replenish();
assert_eq!(manager.active_count(), 4);

manager.replenish();
assert_eq!(manager.active_count(), 3);
}

#[test]
fn test_pool_manager_replenish_skips_when_at_target() {
let config = test_config();
let manager = PoolManager::new(config, Some(3));

// Set active count above target
manager.active_count.store(5, Ordering::SeqCst);

// replenish decrements but logs "skipping" since still >= target
manager.replenish(); // 5 -> 4, still >= 3
assert_eq!(manager.active_count(), 4);

manager.replenish(); // 4 -> 3, still >= 3
assert_eq!(manager.active_count(), 3);
}
}
2 changes: 2 additions & 0 deletions DoWhiz_service/scheduler_module/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Responsibilities:

Schedules:
- `Cron` (6 fields: `sec min hour day month weekday`, UTC)
- Prefer named weekdays like `MON-FRI` for weekday schedules. Numeric weekday ranges are
parser-ambiguous here; for example, `1-5` can behave like Sunday-Thursday.
- `OneShot` (`run_at` timestamp)

## Channels
Expand Down
29 changes: 22 additions & 7 deletions DoWhiz_service/scheduler_module/src/scheduler/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ use crate::thread_state::{current_thread_epoch, default_thread_state_path};

use super::core::Scheduler;
use super::executor::TaskExecutor;
use super::load_run_task_request_context;
use super::reply::load_reply_context;
use super::schedule::{next_run_after, validate_cron_expression};
use super::schedule::{
next_run_after, normalize_weekday_cron_expression, validate_cron_expression,
};
use super::store::SchedulerStore;
use super::types::{RunTaskTask, Schedule, ScheduledTask, SchedulerError, SendReplyTask, TaskKind};
use super::utils::parse_datetime;
Expand Down Expand Up @@ -1247,6 +1250,7 @@ pub(crate) fn apply_scheduler_actions<E: TaskExecutor>(
let mut rescheduled = 0usize;
let mut created = 0usize;
let mut skipped = 0usize;
let request_context = load_run_task_request_context(task);

for action in actions {
match action {
Expand Down Expand Up @@ -1279,7 +1283,11 @@ pub(crate) fn apply_scheduler_actions<E: TaskExecutor>(
continue;
}
};
match resolve_schedule_request(schedule, now) {
match resolve_schedule_request_with_context(
schedule,
now,
request_context.as_deref(),
) {
Ok(new_schedule) => {
target.schedule = new_schedule;
target.enabled = true;
Expand All @@ -1301,7 +1309,11 @@ pub(crate) fn apply_scheduler_actions<E: TaskExecutor>(
codex_disabled,
reply_to,
} => {
let schedule = match resolve_schedule_request(schedule, now) {
let schedule = match resolve_schedule_request_with_context(
schedule,
now,
request_context.as_deref(),
) {
Ok(schedule) => schedule,
Err(err) => {
warn!(
Expand Down Expand Up @@ -1369,16 +1381,19 @@ fn parse_action_task_ids(task_ids: &[String]) -> (HashSet<Uuid>, Vec<String>) {
(ids, invalid)
}

pub(crate) fn resolve_schedule_request(
pub(crate) fn resolve_schedule_request_with_context(
schedule: &run_task_module::ScheduleRequest,
now: DateTime<Utc>,
request_context: Option<&str>,
) -> Result<Schedule, SchedulerError> {
match schedule {
run_task_module::ScheduleRequest::Cron { expression } => {
validate_cron_expression(expression)?;
let next_run = next_run_after(expression, now)?;
let normalized_expression =
normalize_weekday_cron_expression(expression, request_context);
validate_cron_expression(&normalized_expression)?;
let next_run = next_run_after(&normalized_expression, now)?;
Ok(Schedule::Cron {
expression: expression.clone(),
expression: normalized_expression,
next_run,
})
}
Expand Down
58 changes: 57 additions & 1 deletion DoWhiz_service/scheduler_module/src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ pub use types::{
pub use utils::load_google_access_token_from_service_env;

use chrono::{DateTime, Duration as ChronoDuration, Utc};
use std::fs;
use std::path::Path;

use self::schedule::next_run_after;
use self::schedule::{next_run_after, normalize_weekday_cron_expression};

const ROUTINE_ONE_SHOT_DELAY_THRESHOLD_MINUTES: i64 = 5;

Expand Down Expand Up @@ -127,5 +128,60 @@ pub fn prepare_task_for_resume(
Ok(resumed)
}

pub(crate) fn load_run_task_request_context(task: &RunTaskTask) -> Option<String> {
let input_email_dir = if task.input_email_dir.is_absolute() {
task.input_email_dir.clone()
} else {
task.workspace_dir.join(&task.input_email_dir)
};

let thread_request_path = input_email_dir.join("thread_request.md");
if let Ok(content) = fs::read_to_string(thread_request_path) {
let trimmed = content.trim();
if !trimmed.is_empty() {
return Some(trimmed.to_string());
}
}

None
}

/// Repair legacy weekday cron expressions for stored run_task schedules.
///
/// This is a compatibility shim for older model-generated schedules that encoded a Monday-Friday
/// request with numeric weekday fields like `1-5`, which the scheduler's cron parser can
/// interpret as Sunday-Thursday. We only rewrite the cron when the original request context
/// clearly asked for weekdays, so legitimate Sunday-Thursday routines remain untouched.
pub(crate) fn maybe_repair_legacy_weekday_cron_task(
task: &mut ScheduledTask,
now: DateTime<Utc>,
) -> Result<bool, SchedulerError> {
let request_context = match &task.kind {
TaskKind::RunTask(run_task) => load_run_task_request_context(run_task),
_ => return Ok(false),
};

let Schedule::Cron {
expression,
next_run,
} = &mut task.schedule
else {
return Ok(false);
};

let normalized = normalize_weekday_cron_expression(expression, request_context.as_deref());
if normalized == *expression {
return Ok(false);
}

let reference_time = task
.last_run
.map(|value| if value > now { value } else { now })
.unwrap_or(now);
*expression = normalized;
*next_run = next_run_after(expression, reference_time)?;
Ok(true)
}

#[cfg(test)]
mod tests;
Loading
Loading