From 711635717335114a9a7a70eee615f3f0312e40fd Mon Sep 17 00:00:00 2001 From: Bhanu Dahiya Date: Thu, 2 Apr 2026 05:14:35 +0530 Subject: [PATCH 1/2] fix(foundation): add execution event parity for workflow lifecycle --- .../src/workflow/execution_event.rs | 21 +- .../mofa-foundation/src/workflow/executor.rs | 387 ++++++++++++++---- 2 files changed, 335 insertions(+), 73 deletions(-) diff --git a/crates/mofa-foundation/src/workflow/execution_event.rs b/crates/mofa-foundation/src/workflow/execution_event.rs index 5b04db29d..020dcb49b 100644 --- a/crates/mofa-foundation/src/workflow/execution_event.rs +++ b/crates/mofa-foundation/src/workflow/execution_event.rs @@ -90,11 +90,24 @@ pub enum ExecutionEvent { total_duration_ms: u64, }, - /// Checkpoint created during workflow execution - CheckpointCreated { - label: String, + /// Workflow execution paused (e.g., waiting for human input) + WorkflowPaused { + workflow_id: String, + node_id: String, + reason: String, + paused_at: u64, }, + /// Workflow execution resumed after pause/checkpoint + WorkflowResumed { + workflow_id: String, + resumed_from: String, + resumed_at: u64, + }, + + /// Checkpoint created during workflow execution + CheckpointCreated { label: String }, + /// Retry attempt for a node NodeRetrying { node_id: String, @@ -245,7 +258,7 @@ mod tests { let envelope = ExecutionEventEnvelope::new(event); let serialized = serde_json::to_string_pretty(&envelope).unwrap(); - + assert!(serialized.contains("WorkflowCompleted")); assert!(serialized.contains("schema_version")); } diff --git a/crates/mofa-foundation/src/workflow/executor.rs b/crates/mofa-foundation/src/workflow/executor.rs index d08b5444d..c07c3a80c 100644 --- a/crates/mofa-foundation/src/workflow/executor.rs +++ b/crates/mofa-foundation/src/workflow/executor.rs @@ -180,16 +180,17 @@ impl WorkflowExecutor { // Create steps from completed nodes for (nid, output) in node_outputs { if let Some(status) = node_statuses.get(&nid) - && matches!(status, super::state::NodeStatus::Completed) { - steps.push(ExecutionStep { - step_id: nid.clone(), - step_type: "workflow_node".to_string(), - timestamp_ms: chrono::Utc::now().timestamp_millis() as u64, - input: None, - output: serde_json::to_value(&output).ok(), - metadata: HashMap::new(), - }); - } + && matches!(status, super::state::NodeStatus::Completed) + { + steps.push(ExecutionStep { + step_id: nid.clone(), + step_type: "workflow_node".to_string(), + timestamp_ms: chrono::Utc::now().timestamp_millis() as u64, + input: None, + output: serde_json::to_value(&output).ok(), + metadata: HashMap::new(), + }); + } } // Add current node step @@ -242,6 +243,63 @@ impl WorkflowExecutor { } } + async fn emit_workflow_resumed(&self, workflow_id: &str, resumed_from: &str) { + let resumed_at = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + self.emit_event(ExecutionEvent::WorkflowResumed { + workflow_id: workflow_id.to_string(), + resumed_from: resumed_from.to_string(), + resumed_at, + }) + .await; + } + + async fn emit_node_outcome( + &self, + node_id: &str, + output: Option<&WorkflowValue>, + error: Option<&str>, + duration_ms: u64, + retry_count: u32, + max_retries: u32, + ) { + if retry_count > 0 { + let max_attempts = max_retries.saturating_add(1); + for attempt in 1..=retry_count { + let is_last_retry = attempt == retry_count; + self.emit_event(ExecutionEvent::NodeRetrying { + node_id: node_id.to_string(), + attempt, + max_attempts, + last_error: if is_last_retry { + error.map(ToString::to_string) + } else { + None + }, + }) + .await; + } + } + + if let Some(err) = error { + self.emit_event(ExecutionEvent::NodeFailed { + node_id: node_id.to_string(), + error: err.to_string(), + duration_ms, + }) + .await; + } else { + self.emit_event(ExecutionEvent::NodeCompleted { + node_id: node_id.to_string(), + output: output.and_then(|v| serde_json::to_value(v).ok()), + duration_ms, + }) + .await; + } + } + /// 发送外部事件 /// Send external event pub async fn send_external_event(&self, event_type: &str, data: WorkflowValue) { @@ -379,7 +437,7 @@ impl WorkflowExecutor { }) .await; } - _ => { + WorkflowStatus::Completed => { self.emit_event(ExecutionEvent::WorkflowCompleted { workflow_id: graph.id.clone(), final_output: None, @@ -387,6 +445,7 @@ impl WorkflowExecutor { }) .await; } + _ => {} } // Emit debug telemetry: WorkflowEnd @@ -412,49 +471,45 @@ impl WorkflowExecutor { "Resuming workflow {} from node {} with human input", graph.id, waiting_node_id ); + self.emit_workflow_resumed(&graph.id, waiting_node_id).await; // Check if this was a unified HITL review (check for review_id in variables) if let Some(review_id_value) = ctx.get_variable("review_id").await && let WorkflowValue::String(ref review_id_str) = review_id_value - && let Some(ref review_handler) = self.review_handler { - use mofa_kernel::hitl::ReviewRequestId; - let review_id = ReviewRequestId::new(review_id_str.clone()); - - // Check if review is approved - match review_handler.is_approved(&review_id).await { - Ok(true) => { - info!( - "Review {} approved, proceeding with workflow", - review_id_str - ); - } - Ok(false) => { - // Check if rejected - if let Ok(Some(response)) = - review_handler.get_review_response(&review_id).await - { - match response { - mofa_kernel::hitl::ReviewResponse::Rejected { - reason, .. - } => { - return Err(format!("Review rejected: {}", reason)); - } - _ => { - return Err(format!( - "Review {} not approved", - review_id_str - )); - } - } - } else { - return Err(format!("Review {} not yet resolved", review_id_str)); + && let Some(ref review_handler) = self.review_handler + { + use mofa_kernel::hitl::ReviewRequestId; + let review_id = ReviewRequestId::new(review_id_str.clone()); + + // Check if review is approved + match review_handler.is_approved(&review_id).await { + Ok(true) => { + info!( + "Review {} approved, proceeding with workflow", + review_id_str + ); + } + Ok(false) => { + // Check if rejected + if let Ok(Some(response)) = review_handler.get_review_response(&review_id).await + { + match response { + mofa_kernel::hitl::ReviewResponse::Rejected { reason, .. } => { + return Err(format!("Review rejected: {}", reason)); + } + _ => { + return Err(format!("Review {} not approved", review_id_str)); } } - Err(e) => { - warn!("Failed to check review status: {}, proceeding anyway", e); - } + } else { + return Err(format!("Review {} not yet resolved", review_id_str)); } } + Err(e) => { + warn!("Failed to check review status: {}, proceeding anyway", e); + } + } + } // Calculate wait time if let Some(paused_at) = *ctx.paused_at.read().await { @@ -536,7 +591,7 @@ impl WorkflowExecutor { }) .await; } - _ => { + WorkflowStatus::Completed => { self.emit_event(ExecutionEvent::WorkflowCompleted { workflow_id: graph.id.clone(), final_output: None, @@ -544,6 +599,7 @@ impl WorkflowExecutor { }) .await; } + _ => {} } self.emit_debug(DebugEvent::WorkflowEnd { @@ -574,6 +630,11 @@ impl WorkflowExecutor { .as_millis() as u64, }) .await; + self.emit_workflow_resumed( + &graph.id, + &format!("checkpoint:{}", checkpoint.execution_id), + ) + .await; info!( "Resuming workflow execution: {} ({} from checkpoint {})", @@ -671,7 +732,7 @@ impl WorkflowExecutor { }) .await; } - _ => { + WorkflowStatus::Completed => { self.emit_event(ExecutionEvent::WorkflowCompleted { workflow_id: graph.id.clone(), final_output: None, @@ -679,6 +740,7 @@ impl WorkflowExecutor { }) .await; } + _ => {} } // Emit debug telemetry: WorkflowEnd (was missing in resume path) @@ -776,6 +838,16 @@ impl WorkflowExecutor { ctx.set_node_status(¤t_node_id, NodeStatus::Waiting) .await; record.status = WorkflowStatus::Paused; + self.emit_event(ExecutionEvent::WorkflowPaused { + workflow_id: graph.id.clone(), + node_id: current_node_id.clone(), + reason: "hitl_review".to_string(), + paused_at: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64, + }) + .await; // Store review ID in context variables for later retrieval ctx.set_variable( @@ -807,6 +879,16 @@ impl WorkflowExecutor { ctx.set_node_status(¤t_node_id, NodeStatus::Waiting) .await; record.status = WorkflowStatus::Paused; + self.emit_event(ExecutionEvent::WorkflowPaused { + workflow_id: graph.id.clone(), + node_id: current_node_id.clone(), + reason: "wait_node".to_string(), + paused_at: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64, + }) + .await; return Ok(WorkflowValue::Null); } @@ -833,6 +915,7 @@ impl WorkflowExecutor { }) .await; + let mut node_retry_count: u32 = 0; let result = match node.node_type() { NodeType::Parallel => { self.execute_parallel(graph, ctx, node, current_input.clone(), record) @@ -845,9 +928,8 @@ impl WorkflowExecutor { } NodeType::Wait => self.execute_wait(ctx, node, current_input.clone()).await, _ => { - let node_timeout = std::time::Duration::from_millis( - self.config.node_timeout_ms, - ); + let node_timeout = + std::time::Duration::from_millis(self.config.node_timeout_ms); let result = match tokio::time::timeout( node_timeout, node.execute(ctx, current_input.clone()), @@ -862,28 +944,16 @@ impl WorkflowExecutor { ); NodeResult::failed( ¤t_node_id, - &format!( - "Node timed out after {:?}", - node_timeout - ), + &format!("Node timed out after {:?}", node_timeout), node_timeout.as_millis() as u64, ) } }; + node_retry_count = result.retry_count; ctx.set_node_output(¤t_node_id, result.output.clone()) .await; ctx.set_node_status(¤t_node_id, result.status.clone()) .await; - let node_end_ms = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_millis() as u64; - self.emit_event(ExecutionEvent::NodeCompleted { - node_id: current_node_id.clone(), - output: serde_json::to_value(&result.output).ok(), - duration_ms: node_end_ms.saturating_sub(start_time), - }) - .await; if result.status.is_success() { Ok(result.output) } else { @@ -909,6 +979,21 @@ impl WorkflowExecutor { }) .await; + if let Err(e) = &result { + ctx.set_node_status(¤t_node_id, NodeStatus::Failed(e.clone())) + .await; + } + + self.emit_node_outcome( + ¤t_node_id, + result.as_ref().ok(), + result.as_ref().err().map(String::as_str), + end_time.saturating_sub(start_time), + node_retry_count, + node.config.retry_policy.max_retries, + ) + .await; + // Record node execution record.node_records.push(NodeExecutionRecord { node_id: current_node_id.clone(), @@ -918,7 +1003,7 @@ impl WorkflowExecutor { .get_node_status(¤t_node_id) .await .unwrap_or(NodeStatus::Pending), - retry_count: 0, + retry_count: node_retry_count, }); // 检查点 @@ -1952,9 +2037,7 @@ mod tests { graph.add_node(WorkflowNode::task( "fast_task", "Fast Task", - |_ctx, _input| async move { - Ok(WorkflowValue::String("fast".to_string())) - }, + |_ctx, _input| async move { Ok(WorkflowValue::String("fast".to_string())) }, )); graph.add_node(WorkflowNode::end("end")); graph.connect("start", "fast_task"); @@ -2206,4 +2289,170 @@ mod tests { *events ); } + + fn drain_events(mut rx: tokio::sync::mpsc::Receiver) -> Vec { + let mut events = Vec::new(); + while let Ok(event) = rx.try_recv() { + events.push(event); + } + events + } + + #[tokio::test] + async fn test_emits_retry_and_completion_events_for_flaky_node() { + use std::sync::Arc; + use std::sync::atomic::{AtomicUsize, Ordering}; + use tokio::sync::mpsc; + + let attempts = Arc::new(AtomicUsize::new(0)); + let attempts_clone = attempts.clone(); + + let (tx, rx) = mpsc::channel(256); + let executor = WorkflowExecutor::new(ExecutorConfig::default()).with_event_sender(tx); + + let mut graph = WorkflowGraph::new("retry_events_wf", "Retry events workflow"); + graph.add_node(WorkflowNode::start("start")); + let flaky = WorkflowNode::task("flaky", "Flaky node", move |_ctx, _input| { + let attempts = attempts_clone.clone(); + async move { + let attempt = attempts.fetch_add(1, Ordering::SeqCst); + if attempt < 2 { + Err("transient failure".to_string()) + } else { + Ok(WorkflowValue::String("ok".to_string())) + } + } + }) + .with_retry(crate::workflow::node::RetryPolicy { + max_retries: 2, + ..Default::default() + }); + graph.add_node(flaky); + graph.add_node(WorkflowNode::end("end")); + graph.connect("start", "flaky"); + graph.connect("flaky", "end"); + + let record = executor + .execute(&graph, WorkflowValue::Null) + .await + .expect("workflow should succeed"); + assert!(matches!(record.status, WorkflowStatus::Completed)); + + let events = drain_events(rx); + let retry_events: Vec<&ExecutionEvent> = events + .iter() + .filter( + |e| matches!(e, ExecutionEvent::NodeRetrying { node_id, .. } if node_id == "flaky"), + ) + .collect(); + assert_eq!( + retry_events.len(), + 2, + "expected 2 retry events for flaky node, got: {:?}", + events + ); + assert!( + events.iter().any( + |e| matches!(e, ExecutionEvent::NodeCompleted { node_id, .. } if node_id == "flaky") + ), + "expected NodeCompleted event for flaky node, got: {:?}", + events + ); + assert!( + !events.iter().any( + |e| matches!(e, ExecutionEvent::NodeFailed { node_id, .. } if node_id == "flaky") + ), + "did not expect NodeFailed for successful flaky node, got: {:?}", + events + ); + } + + #[tokio::test] + async fn test_emits_node_failed_event_on_terminal_failure() { + use tokio::sync::mpsc; + + let (tx, rx) = mpsc::channel(256); + let executor = WorkflowExecutor::new(ExecutorConfig::default()).with_event_sender(tx); + + let mut graph = WorkflowGraph::new("failed_event_wf", "Failed event workflow"); + graph.add_node(WorkflowNode::start("start")); + let failing = WorkflowNode::task("always_fail", "Always fail", |_ctx, _input| async move { + Err("hard failure".to_string()) + }) + .with_retry(crate::workflow::node::RetryPolicy { + max_retries: 1, + ..Default::default() + }); + graph.add_node(failing); + graph.add_node(WorkflowNode::end("end")); + graph.connect("start", "always_fail"); + graph.connect("always_fail", "end"); + + let record = executor + .execute(&graph, WorkflowValue::Null) + .await + .expect("executor should return record even on failure"); + assert!(matches!(record.status, WorkflowStatus::Failed(_))); + + let events = drain_events(rx); + assert!( + events + .iter() + .any(|e| matches!(e, ExecutionEvent::NodeRetrying { node_id, .. } if node_id == "always_fail")), + "expected NodeRetrying event for always_fail, got: {:?}", + events + ); + assert!( + events + .iter() + .any(|e| matches!(e, ExecutionEvent::NodeFailed { node_id, .. } if node_id == "always_fail")), + "expected NodeFailed event for always_fail, got: {:?}", + events + ); + } + + #[tokio::test] + async fn test_emits_paused_and_resumed_lifecycle_events() { + use tokio::sync::mpsc; + + let (tx, rx) = mpsc::channel(256); + let executor = WorkflowExecutor::new(ExecutorConfig::default()).with_event_sender(tx); + let graph = make_wait_graph("pause_resume_events_wf"); + + let first = executor + .execute(&graph, WorkflowValue::Null) + .await + .expect("first run should succeed"); + assert!(matches!(first.status, WorkflowStatus::Paused)); + let ctx = first + .context + .expect("paused workflow should include context"); + + let resumed = executor + .resume_with_human_input( + &graph, + ctx, + "wait_review", + WorkflowValue::String("approved".to_string()), + ) + .await + .expect("resume should succeed"); + assert!(matches!(resumed.status, WorkflowStatus::Completed)); + + let events = drain_events(rx); + assert!( + events + .iter() + .any(|e| matches!(e, ExecutionEvent::WorkflowPaused { workflow_id, .. } if workflow_id == "pause_resume_events_wf")), + "expected WorkflowPaused event, got: {:?}", + events + ); + assert!( + events + .iter() + .any(|e| matches!(e, ExecutionEvent::WorkflowResumed { workflow_id, .. } if workflow_id == "pause_resume_events_wf")), + "expected WorkflowResumed event, got: {:?}", + events + ); + } } From cc2e44a991d662cc9bd015f6590543aad2046580 Mon Sep 17 00:00:00 2001 From: Bhanu Dahiya Date: Thu, 2 Apr 2026 07:12:52 +0530 Subject: [PATCH 2/2] chore(ci): retrigger ubuntu test run