From 793f457278ccec3550e01c1ad3444fdb465f1e00 Mon Sep 17 00:00:00 2001 From: Hunter B Date: Fri, 5 Jun 2026 23:16:30 -0700 Subject: [PATCH] feat(whaleflow): mark mock cancellation and budgets --- CHANGELOG.md | 7 +- crates/tui/CHANGELOG.md | 7 +- crates/whaleflow/src/lib.rs | 262 ++++++++++++++++++++++++++++++++---- 3 files changed, 248 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 13e054cb4..2e3be6910 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -61,8 +61,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 examples plus a one-pass repair for common `ctx.*` authoring aliases (#2670). Leaf, branch, and workflow execution results now carry deterministic token and cost telemetry fields that the mock executor can aggregate without live - provider calls or runtime sub-agent fanout (#2486). A crate-only replay - executor now evaluates workflows from recorded leaf/control records, computes + provider calls or runtime sub-agent fanout (#2486). The mock executor now + carries crate-local cancellation and budget-exhaustion status markers so the + branch/leaf runtime contract can be tested before live workflow execution is + exposed (#2669). A crate-only replay executor now evaluates workflows from + recorded leaf/control records, computes stable SHA-256 leaf input hashes, and marks missing records as `replay_diverged` instead of calling models again (#2673); the runtime replay command and live-provider replay fallback remain deferred. The crate also now diff --git a/crates/tui/CHANGELOG.md b/crates/tui/CHANGELOG.md index 13e054cb4..2e3be6910 100644 --- a/crates/tui/CHANGELOG.md +++ b/crates/tui/CHANGELOG.md @@ -61,8 +61,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 examples plus a one-pass repair for common `ctx.*` authoring aliases (#2670). Leaf, branch, and workflow execution results now carry deterministic token and cost telemetry fields that the mock executor can aggregate without live - provider calls or runtime sub-agent fanout (#2486). A crate-only replay - executor now evaluates workflows from recorded leaf/control records, computes + provider calls or runtime sub-agent fanout (#2486). The mock executor now + carries crate-local cancellation and budget-exhaustion status markers so the + branch/leaf runtime contract can be tested before live workflow execution is + exposed (#2669). A crate-only replay executor now evaluates workflows from + recorded leaf/control records, computes stable SHA-256 leaf input hashes, and marks missing records as `replay_diverged` instead of calling models again (#2673); the runtime replay command and live-provider replay fallback remain deferred. The crate also now diff --git a/crates/whaleflow/src/lib.rs b/crates/whaleflow/src/lib.rs index ae630b5b7..155946a75 100644 --- a/crates/whaleflow/src/lib.rs +++ b/crates/whaleflow/src/lib.rs @@ -542,6 +542,7 @@ pub enum WorkflowRunStatus { Succeeded, Failed, Cancelled, + BudgetExceeded, ReplayDiverged, } @@ -591,9 +592,24 @@ impl WorkflowExecution { self.status = WorkflowRunStatus::Failed; } + pub fn mark_cancelled(&mut self) { + self.status = WorkflowRunStatus::Cancelled; + } + + pub fn mark_budget_exceeded(&mut self) { + self.status = WorkflowRunStatus::BudgetExceeded; + } + pub(crate) fn mark_replay_diverged(&mut self) { self.status = WorkflowRunStatus::ReplayDiverged; } + + fn should_stop_mock_execution(&self) -> bool { + matches!( + self.status, + WorkflowRunStatus::Cancelled | WorkflowRunStatus::BudgetExceeded + ) + } } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -646,6 +662,9 @@ pub struct MockWorkflowExecutor { leaf_outcomes: BTreeMap, predicate_results: BTreeMap>, generated_nodes: BTreeMap>, + cancelled: bool, + max_leaf_steps: Option, + leaf_steps_executed: u32, } impl MockWorkflowExecutor { @@ -680,6 +699,16 @@ impl MockWorkflowExecutor { self } + pub fn with_cancelled(mut self) -> Self { + self.cancelled = true; + self + } + + pub fn with_max_leaf_steps(mut self, max_leaf_steps: u32) -> Self { + self.max_leaf_steps = Some(max_leaf_steps); + self + } + pub fn run( &mut self, spec: &WorkflowSpec, @@ -696,6 +725,9 @@ impl MockWorkflowExecutor { execution: &mut WorkflowExecution, ) -> Result<(), WorkflowExecutionError> { for node in nodes { + if execution.should_stop_mock_execution() { + break; + } self.execute_node(node, execution)?; } Ok(()) @@ -758,23 +790,14 @@ impl MockWorkflowExecutor { ) -> Result<(), WorkflowExecutionError> { let before = execution.leaf_results.len(); self.execute_nodes(&spec.children, execution)?; - let branch_failed = execution.leaf_results[before..] - .iter() - .any(|result| result.status != WorkflowRunStatus::Succeeded); - let status = if branch_failed { - WorkflowRunStatus::Failed - } else { - WorkflowRunStatus::Succeeded - }; + let status = aggregate_mock_status(&execution.leaf_results[before..]); let mut usage = WorkflowUsage::default(); let mut memo_usage = WorkflowMemoUsage::default(); for result in &execution.leaf_results[before..] { usage.add_assign(result.usage); memo_usage.add_assign(result.memo_usage); } - if status == WorkflowRunStatus::Failed { - execution.mark_failed(); - } + mark_execution_for_status(execution, status); execution.branch_results.push(BranchResult { branch_id: spec.id.clone(), task_id: spec.id.clone(), @@ -795,13 +818,8 @@ impl MockWorkflowExecutor { } fn execute_leaf(&mut self, spec: &LeafSpec, execution: &mut WorkflowExecution) { - let outcome = self - .leaf_outcomes - .remove(&spec.id) - .unwrap_or_else(|| MockLeafOutcome::succeeded(format!("mock leaf {}", spec.id))); - if outcome.status != WorkflowRunStatus::Succeeded { - execution.mark_failed(); - } + let outcome = self.mock_leaf_outcome(spec); + mark_execution_for_status(execution, outcome.status); execution.usage.add_assign(outcome.usage); execution.memo_usage.add_assign(outcome.memo_usage); execution.leaf_results.push(LeafResult { @@ -824,21 +842,27 @@ impl MockWorkflowExecutor { let mut iterations = 0; let mut passed = false; while iterations < max_iterations { + if execution.should_stop_mock_execution() { + break; + } iterations += 1; self.execute_nodes(&spec.children, execution)?; + if execution.should_stop_mock_execution() { + break; + } if self.next_predicate_result(&spec.id) { passed = true; break; } } - let status = if passed { + let status = if execution.should_stop_mock_execution() { + execution.status + } else if passed { WorkflowRunStatus::Succeeded } else { WorkflowRunStatus::Failed }; - if status == WorkflowRunStatus::Failed { - execution.mark_failed(); - } + mark_execution_for_status(execution, status); execution.control_node_results.push(ControlNodeResult { node_id: spec.id.clone(), kind: ControlNodeKind::LoopUntil, @@ -861,10 +885,15 @@ impl MockWorkflowExecutor { &spec.else_nodes }; self.execute_nodes(selected_nodes, execution)?; + let status = if execution.should_stop_mock_execution() { + execution.status + } else { + WorkflowRunStatus::Succeeded + }; execution.control_node_results.push(ControlNodeResult { node_id: spec.id.clone(), kind: ControlNodeKind::Cond, - status: WorkflowRunStatus::Succeeded, + status, selected_children: selected_nodes.iter().map(node_id).collect(), summary: Some(format!("predicate_result={passed}")), }); @@ -882,16 +911,47 @@ impl MockWorkflowExecutor { } validate_workflow_node_shapes(&nodes)?; self.execute_nodes(&nodes, execution)?; + let status = if execution.should_stop_mock_execution() { + execution.status + } else { + WorkflowRunStatus::Succeeded + }; execution.control_node_results.push(ControlNodeResult { node_id: spec.id.clone(), kind: ControlNodeKind::Expand, - status: WorkflowRunStatus::Succeeded, + status, selected_children: nodes.iter().map(node_id).collect(), summary: Some(format!("expanded_from={}", spec.source)), }); Ok(()) } + fn mock_leaf_outcome(&mut self, spec: &LeafSpec) -> MockLeafOutcome { + if self.cancelled { + return MockLeafOutcome { + status: WorkflowRunStatus::Cancelled, + usage: WorkflowUsage::default(), + memo_usage: WorkflowMemoUsage::default(), + output: Some("mock workflow cancelled before leaf execution".to_string()), + artifacts: Vec::new(), + }; + } + if self.max_leaf_steps == Some(self.leaf_steps_executed) || spec.budget.max_steps == Some(0) + { + return MockLeafOutcome { + status: WorkflowRunStatus::BudgetExceeded, + usage: WorkflowUsage::default(), + memo_usage: WorkflowMemoUsage::default(), + output: Some("mock workflow leaf budget exhausted".to_string()), + artifacts: Vec::new(), + }; + } + self.leaf_steps_executed = self.leaf_steps_executed.saturating_add(1); + self.leaf_outcomes + .remove(&spec.id) + .unwrap_or_else(|| MockLeafOutcome::succeeded(format!("mock leaf {}", spec.id))) + } + fn next_predicate_result(&mut self, node_id: &str) -> bool { let Some(results) = self.predicate_results.get_mut(node_id) else { return false; @@ -903,6 +963,37 @@ impl MockWorkflowExecutor { } } +fn aggregate_mock_status(results: &[LeafResult]) -> WorkflowRunStatus { + if results + .iter() + .any(|result| result.status == WorkflowRunStatus::Cancelled) + { + WorkflowRunStatus::Cancelled + } else if results + .iter() + .any(|result| result.status == WorkflowRunStatus::BudgetExceeded) + { + WorkflowRunStatus::BudgetExceeded + } else if results + .iter() + .any(|result| result.status != WorkflowRunStatus::Succeeded) + { + WorkflowRunStatus::Failed + } else { + WorkflowRunStatus::Succeeded + } +} + +fn mark_execution_for_status(execution: &mut WorkflowExecution, status: WorkflowRunStatus) { + match status { + WorkflowRunStatus::Succeeded | WorkflowRunStatus::Pending | WorkflowRunStatus::Running => {} + WorkflowRunStatus::Failed => execution.mark_failed(), + WorkflowRunStatus::Cancelled => execution.mark_cancelled(), + WorkflowRunStatus::BudgetExceeded => execution.mark_budget_exceeded(), + WorkflowRunStatus::ReplayDiverged => execution.mark_replay_diverged(), + } +} + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct BranchCandidate { pub branch_id: String, @@ -1720,6 +1811,21 @@ mod tests { }) } + fn leaf_node_with_budget(id: &str, budget: BudgetSpec) -> WorkflowNode { + WorkflowNode::Leaf(LeafSpec { + id: id.to_string(), + prompt: format!("run {id}"), + agent_type: AgentType::General, + mode: TaskMode::ReadOnly, + isolation: IsolationMode::Shared, + file_scope: Vec::new(), + depends_on_results: Vec::new(), + budget, + permissions: PermissionSpec::default(), + model_policy: ModelPolicy::default(), + }) + } + fn invalid_leaf_node(id: &str) -> WorkflowNode { WorkflowNode::Leaf(LeafSpec { id: id.to_string(), @@ -2399,6 +2505,114 @@ mod tests { ); } + #[test] + fn mock_executor_marks_cancelled_before_leaf() { + let workflow = workflow_spec(vec![WorkflowNode::BranchSet(BranchSpec { + id: "discover".to_string(), + description: None, + parallel: true, + budget: BudgetSpec::default(), + permissions: PermissionSpec::default(), + model_policy: ModelPolicy::default(), + children: vec![leaf_node("scan-readme"), leaf_node("scan-tests")], + })]); + + let mut executor = MockWorkflowExecutor::new().with_cancelled(); + let execution = executor.run(&workflow).expect("mock workflow should run"); + + assert_eq!(execution.status, WorkflowRunStatus::Cancelled); + assert_eq!(execution.leaf_results.len(), 1); + assert_eq!( + execution.leaf_results[0].status, + WorkflowRunStatus::Cancelled + ); + assert_eq!( + execution.branch_results[0].status, + WorkflowRunStatus::Cancelled + ); + assert_eq!( + control_result(&execution, "discover").status, + WorkflowRunStatus::Cancelled + ); + } + + #[test] + fn mock_executor_stops_when_global_leaf_budget_is_exhausted() { + let workflow = workflow_spec(vec![WorkflowNode::BranchSet(BranchSpec { + id: "discover".to_string(), + description: None, + parallel: true, + budget: BudgetSpec::default(), + permissions: PermissionSpec::default(), + model_policy: ModelPolicy::default(), + children: vec![ + leaf_node("scan-readme"), + leaf_node("scan-config"), + leaf_node("scan-tests"), + ], + })]); + + let mut executor = MockWorkflowExecutor::new().with_max_leaf_steps(1); + let execution = executor.run(&workflow).expect("mock workflow should run"); + + assert_eq!(execution.status, WorkflowRunStatus::BudgetExceeded); + assert_eq!( + execution + .leaf_results + .iter() + .map(|result| (result.leaf_id.as_str(), result.status)) + .collect::>(), + vec![ + ("scan-readme", WorkflowRunStatus::Succeeded), + ("scan-config", WorkflowRunStatus::BudgetExceeded) + ] + ); + assert_eq!( + execution.branch_results[0].status, + WorkflowRunStatus::BudgetExceeded + ); + } + + #[test] + fn mock_executor_honors_zero_step_leaf_budget() { + let workflow = workflow_spec(vec![WorkflowNode::BranchSet(BranchSpec { + id: "verify".to_string(), + description: None, + parallel: false, + budget: BudgetSpec::default(), + permissions: PermissionSpec::default(), + model_policy: ModelPolicy::default(), + children: vec![ + leaf_node_with_budget( + "run-tests", + BudgetSpec { + max_steps: Some(0), + timeout_secs: None, + max_parallel: None, + }, + ), + leaf_node("summarize"), + ], + })]); + + let mut executor = MockWorkflowExecutor::new(); + let execution = executor.run(&workflow).expect("mock workflow should run"); + + assert_eq!(execution.status, WorkflowRunStatus::BudgetExceeded); + assert_eq!(execution.leaf_results.len(), 1); + assert_eq!( + execution.leaf_results[0].status, + WorkflowRunStatus::BudgetExceeded + ); + assert!( + execution.leaf_results[0] + .output + .as_deref() + .unwrap_or_default() + .contains("budget exhausted") + ); + } + #[test] fn loop_until_stops_on_pass() { let workflow = workflow_spec(vec![WorkflowNode::LoopUntil(LoopUntilSpec {