Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .agents/skills/scheduler_maintain/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ SCHEDULER_ACTIONS_JSON_END
## Rules
- Use RFC3339 UTC timestamps.
- Cron uses 6 fields: `sec min hour day month weekday`.
- Prefer named weekdays like `MON-FRI` for recurring weekday schedules. Do not use numeric
weekday ranges for Monday-Friday here; in this scheduler parser, `1-5` is ambiguous and can
behave like Sunday-Thursday.
- Do not include workspace paths; `create_run_task` always targets the current workspace.
- Output only JSON inside blocks; no commentary inside blocks.
- If no changes are requested, omit the relevant block.
2 changes: 2 additions & 0 deletions DoWhiz_service/scheduler_module/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Responsibilities:

Schedules:
- `Cron` (6 fields: `sec min hour day month weekday`, UTC)
- Prefer named weekdays like `MON-FRI` for weekday schedules. Numeric weekday ranges are
parser-ambiguous here; for example, `1-5` can behave like Sunday-Thursday.
- `OneShot` (`run_at` timestamp)

## Channels
Expand Down
29 changes: 22 additions & 7 deletions DoWhiz_service/scheduler_module/src/scheduler/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ use crate::thread_state::{current_thread_epoch, default_thread_state_path};

use super::core::Scheduler;
use super::executor::TaskExecutor;
use super::load_run_task_request_context;
use super::reply::load_reply_context;
use super::schedule::{next_run_after, validate_cron_expression};
use super::schedule::{
next_run_after, normalize_weekday_cron_expression, validate_cron_expression,
};
use super::store::SchedulerStore;
use super::types::{RunTaskTask, Schedule, ScheduledTask, SchedulerError, SendReplyTask, TaskKind};
use super::utils::parse_datetime;
Expand Down Expand Up @@ -1247,6 +1250,7 @@ pub(crate) fn apply_scheduler_actions<E: TaskExecutor>(
let mut rescheduled = 0usize;
let mut created = 0usize;
let mut skipped = 0usize;
let request_context = load_run_task_request_context(task);

for action in actions {
match action {
Expand Down Expand Up @@ -1279,7 +1283,11 @@ pub(crate) fn apply_scheduler_actions<E: TaskExecutor>(
continue;
}
};
match resolve_schedule_request(schedule, now) {
match resolve_schedule_request_with_context(
schedule,
now,
request_context.as_deref(),
) {
Ok(new_schedule) => {
target.schedule = new_schedule;
target.enabled = true;
Expand All @@ -1301,7 +1309,11 @@ pub(crate) fn apply_scheduler_actions<E: TaskExecutor>(
codex_disabled,
reply_to,
} => {
let schedule = match resolve_schedule_request(schedule, now) {
let schedule = match resolve_schedule_request_with_context(
schedule,
now,
request_context.as_deref(),
) {
Ok(schedule) => schedule,
Err(err) => {
warn!(
Expand Down Expand Up @@ -1369,16 +1381,19 @@ fn parse_action_task_ids(task_ids: &[String]) -> (HashSet<Uuid>, Vec<String>) {
(ids, invalid)
}

pub(crate) fn resolve_schedule_request(
pub(crate) fn resolve_schedule_request_with_context(
schedule: &run_task_module::ScheduleRequest,
now: DateTime<Utc>,
request_context: Option<&str>,
) -> Result<Schedule, SchedulerError> {
match schedule {
run_task_module::ScheduleRequest::Cron { expression } => {
validate_cron_expression(expression)?;
let next_run = next_run_after(expression, now)?;
let normalized_expression =
normalize_weekday_cron_expression(expression, request_context);
validate_cron_expression(&normalized_expression)?;
let next_run = next_run_after(&normalized_expression, now)?;
Ok(Schedule::Cron {
expression: expression.clone(),
expression: normalized_expression,
next_run,
})
}
Expand Down
58 changes: 57 additions & 1 deletion DoWhiz_service/scheduler_module/src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ pub use types::{
pub use utils::load_google_access_token_from_service_env;

use chrono::{DateTime, Duration as ChronoDuration, Utc};
use std::fs;
use std::path::Path;

use self::schedule::next_run_after;
use self::schedule::{next_run_after, normalize_weekday_cron_expression};

const ROUTINE_ONE_SHOT_DELAY_THRESHOLD_MINUTES: i64 = 5;

Expand Down Expand Up @@ -127,5 +128,60 @@ pub fn prepare_task_for_resume(
Ok(resumed)
}

pub(crate) fn load_run_task_request_context(task: &RunTaskTask) -> Option<String> {
let input_email_dir = if task.input_email_dir.is_absolute() {
task.input_email_dir.clone()
} else {
task.workspace_dir.join(&task.input_email_dir)
};

let thread_request_path = input_email_dir.join("thread_request.md");
if let Ok(content) = fs::read_to_string(thread_request_path) {
let trimmed = content.trim();
if !trimmed.is_empty() {
return Some(trimmed.to_string());
}
}

None
}

/// Repair legacy weekday cron expressions for stored run_task schedules.
///
/// This is a compatibility shim for older model-generated schedules that encoded a Monday-Friday
/// request with numeric weekday fields like `1-5`, which the scheduler's cron parser can
/// interpret as Sunday-Thursday. We only rewrite the cron when the original request context
/// clearly asked for weekdays, so legitimate Sunday-Thursday routines remain untouched.
pub(crate) fn maybe_repair_legacy_weekday_cron_task(
task: &mut ScheduledTask,
now: DateTime<Utc>,
) -> Result<bool, SchedulerError> {
let request_context = match &task.kind {
TaskKind::RunTask(run_task) => load_run_task_request_context(run_task),
_ => return Ok(false),
};

let Schedule::Cron {
expression,
next_run,
} = &mut task.schedule
else {
return Ok(false);
};

let normalized = normalize_weekday_cron_expression(expression, request_context.as_deref());
if normalized == *expression {
return Ok(false);
}

let reference_time = task
.last_run
.map(|value| if value > now { value } else { now })
.unwrap_or(now);
*expression = normalized;
*next_run = next_run_after(expression, reference_time)?;
Ok(true)
}

#[cfg(test)]
mod tests;
49 changes: 49 additions & 0 deletions DoWhiz_service/scheduler_module/src/scheduler/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,17 @@ use std::str::FromStr;

use super::types::SchedulerError;

const WEEKDAY_REQUEST_MARKERS: &[&str] = &[
"weekday",
"weekdays",
"business day",
"business days",
"monday through friday",
"monday to friday",
"mon-fri",
"mon thru fri",
];

pub(crate) fn validate_cron_expression(expression: &str) -> Result<(), SchedulerError> {
let fields = expression.split_whitespace().count();
if fields != 6 {
Expand All @@ -25,3 +36,41 @@ pub(crate) fn next_run_after(
}
Err(SchedulerError::NoNextRun)
}

/// Normalize legacy ambiguous weekday cron expressions for explicit weekday requests.
///
/// The cron parser used by the scheduler does not follow the common Unix-cron numeric weekday
/// convention. In practice, numeric weekday ranges like `1-5` are ambiguous for model output and
/// have historically produced Sunday-Thursday runs when the user asked for Monday-Friday
/// weekdays. When the request text clearly asks for a Monday-Friday cadence, rewrite legacy
/// numeric weekday fields to the unambiguous `MON-FRI`.
pub(crate) fn normalize_weekday_cron_expression(
expression: &str,
request_context: Option<&str>,
) -> String {
let Some(request_context) = request_context else {
return expression.to_string();
};
if !request_implies_monday_through_friday(request_context) {
return expression.to_string();
}

let mut fields: Vec<&str> = expression.split_whitespace().collect();
if fields.len() != 6 || !is_legacy_weekday_field(fields[5]) {
return expression.to_string();
}

fields[5] = "MON-FRI";
fields.join(" ")
}

fn request_implies_monday_through_friday(text: &str) -> bool {
let normalized = text.to_ascii_lowercase();
WEEKDAY_REQUEST_MARKERS
.iter()
.any(|marker| normalized.contains(marker))
}

fn is_legacy_weekday_field(field: &str) -> bool {
matches!(field.trim(), "0-4" | "0,1,2,3,4" | "1-5" | "1,2,3,4,5")
}
52 changes: 38 additions & 14 deletions DoWhiz_service/scheduler_module/src/scheduler/store/mongo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ use uuid::Uuid;

use crate::mongo_store::{create_client_from_env, database_from_env, ensure_index_compatible};

use super::super::is_user_visible_routine_task;
use super::super::types::{Schedule, ScheduledTask, SchedulerError};
use super::super::utils::{task_kind_channel, task_kind_label};
use super::super::{is_user_visible_routine_task, maybe_repair_legacy_weekday_cron_task};
use super::{RoutineSummary, TaskDebugArchiveRecord, TaskStatusSummary};

static EXECUTION_SEQ: AtomicI64 = AtomicI64::new(1);
Expand Down Expand Up @@ -112,14 +112,18 @@ impl MongoSchedulerStore {
.map_err(mongo_err)?;
let mut seen_task_ids = HashSet::new();
let mut tasks = Vec::new();
let now = Utc::now();
for row in cursor {
let document = row.map_err(mongo_err)?;
if let Ok(task_id) = document.get_str("task_id") {
if !seen_task_ids.insert(task_id.to_string()) {
continue;
}
}
let task = deserialize_task_document(&document)?;
let mut task = deserialize_task_document(&document)?;
if maybe_repair_legacy_weekday_cron_task(&mut task, now)? {
self.update_task(&task)?;
}
tasks.push(task);
}
tasks.sort_by_key(|task| task.created_at);
Expand All @@ -134,7 +138,15 @@ impl MongoSchedulerStore {
.tasks
.find_one(self.task_filter(task_id), None)
.map_err(mongo_err)?;
document.as_ref().map(deserialize_task_document).transpose()
let Some(document) = document else {
return Ok(None);
};

let mut task = deserialize_task_document(&document)?;
if maybe_repair_legacy_weekday_cron_task(&mut task, Utc::now())? {
self.update_task(&task)?;
}
Ok(Some(task))
}

pub(crate) fn insert_task(&self, task: &ScheduledTask) -> Result<(), SchedulerError> {
Expand Down Expand Up @@ -391,6 +403,7 @@ impl MongoSchedulerStore {
.map_err(mongo_err)?;
let mut summaries = Vec::new();
let mut seen_task_ids = HashSet::new();
let now = Utc::now();
for row in cursor {
let task_doc = row.map_err(mongo_err)?;
let task_id = task_doc
Expand All @@ -399,23 +412,31 @@ impl MongoSchedulerStore {
if !seen_task_ids.insert(task_id.to_string()) {
continue;
}
let mut task = deserialize_task_document(&task_doc)?;
if maybe_repair_legacy_weekday_cron_task(&mut task, now)? {
self.update_task(&task)?;
}
let request_summary = derive_request_summary(&task_doc);
let schedule = task_doc.get_document("schedule").ok();
let execution = self.latest_execution_for_task(task_id)?;
let (schedule_type, next_run, run_at) = match &task.schedule {
Schedule::Cron { next_run, .. } => {
("cron".to_string(), Some(next_run.to_rfc3339()), None)
}
Schedule::OneShot { run_at } => {
("one_shot".to_string(), None, Some(run_at.to_rfc3339()))
}
};
summaries.push(TaskStatusSummary {
id: task_id.to_string(),
kind: task_doc.get_str("kind").unwrap_or("unknown").to_string(),
channel: task_doc.get_str("channel").unwrap_or("email").to_string(),
request_summary,
enabled: task_doc.get_bool("enabled").unwrap_or(false),
created_at: datetime_field_to_rfc3339(&task_doc, "created_at").unwrap_or_default(),
last_run: datetime_field_to_rfc3339(&task_doc, "last_run"),
schedule_type: schedule
.and_then(|doc| doc.get_str("type").ok())
.unwrap_or("one_shot")
.to_string(),
next_run: schedule.and_then(|doc| datetime_field_to_rfc3339(doc, "next_run")),
run_at: schedule.and_then(|doc| datetime_field_to_rfc3339(doc, "run_at")),
enabled: task.enabled,
created_at: task.created_at.to_rfc3339(),
last_run: task.last_run.map(|value| value.to_rfc3339()),
schedule_type,
next_run,
run_at,
execution_status: execution
.as_ref()
.and_then(|doc| doc.get_str("status").ok())
Expand Down Expand Up @@ -456,7 +477,10 @@ impl MongoSchedulerStore {
continue;
}

let task = deserialize_task_document(&task_doc)?;
let mut task = deserialize_task_document(&task_doc)?;
if maybe_repair_legacy_weekday_cron_task(&mut task, now)? {
self.update_task(&task)?;
}
if !is_user_visible_routine_task(&task, now) {
continue;
}
Expand Down
Loading
Loading