diff --git a/bento/crates/workflow/src/tasks/join.rs b/bento/crates/workflow/src/tasks/join.rs index ac2ac121c..2758570c5 100644 --- a/bento/crates/workflow/src/tasks/join.rs +++ b/bento/crates/workflow/src/tasks/join.rs @@ -9,7 +9,7 @@ use crate::{ tasks::{RECUR_RECEIPT_PATH, deserialize_obj, serialize_obj}, }; use anyhow::{Context, Result}; -use risc0_zkvm::{ReceiptClaim, SuccinctReceipt}; +use risc0_zkvm::{ReceiptClaim, SegmentReceipt, SuccinctReceipt}; use uuid::Uuid; use workflow_common::JoinReq; @@ -23,15 +23,52 @@ pub async fn join(agent: &Agent, job_id: &Uuid, request: &JoinReq) -> Result<()> let left_path_key = format!("{recur_receipts_prefix}:{}", request.left); let right_path_key = format!("{recur_receipts_prefix}:{}", request.right); - let (left_receipt, right_receipt): (Vec, Vec) = + let (left_receipt_data, right_receipt_data): (Vec, Vec) = conn.mget::<_, (Vec, Vec)>(&[&left_path_key, &right_path_key]).await.with_context( || format!("failed to get receipts for keys: {left_path_key}, {right_path_key}"), )?; - let left_receipt: SuccinctReceipt = - deserialize_obj(&left_receipt).context("Failed to deserialize left receipt")?; - let right_receipt: SuccinctReceipt = - deserialize_obj(&right_receipt).context("Failed to deserialize right receipt")?; + // Handle each receipt independently - they could be mixed types + let prover = agent.prover.as_ref().context("Missing prover from resolve task")?; + let left_idx = request.left; + let right_idx = request.right; + + let (left_receipt, right_receipt) = tokio::try_join!( + async { + match deserialize_obj::(&left_receipt_data) { + Ok(segment) => { + let receipt = prover + .lift(&segment) + .with_context(|| format!("Failed to lift segment {left_idx}"))?; + tracing::debug!("lifting complete {job_id} - {left_idx}"); + Ok::<_, anyhow::Error>(receipt) + } + Err(_) => { + let receipt: SuccinctReceipt = + deserialize_obj(&left_receipt_data) + .context("Failed to deserialize left receipt")?; + Ok(receipt) + } + } + }, + async { + match deserialize_obj::(&right_receipt_data) { + Ok(segment) => { + let receipt = prover + .lift(&segment) + .with_context(|| format!("Failed to lift segment {right_idx}"))?; + tracing::debug!("lifting complete {job_id} - {right_idx}"); + Ok::<_, anyhow::Error>(receipt) + } + Err(_) => { + let receipt: SuccinctReceipt = + deserialize_obj(&right_receipt_data) + .context("Failed to deserialize right receipt")?; + Ok(receipt) + } + } + } + )?; tracing::trace!("Joining {job_id} - {} + {} -> {}", request.left, request.right, request.idx); diff --git a/bento/crates/workflow/src/tasks/join_povw.rs b/bento/crates/workflow/src/tasks/join_povw.rs index ab6542dcb..ea5e4b1b4 100644 --- a/bento/crates/workflow/src/tasks/join_povw.rs +++ b/bento/crates/workflow/src/tasks/join_povw.rs @@ -9,7 +9,7 @@ use crate::{ tasks::{RECUR_RECEIPT_PATH, deserialize_obj, serialize_obj}, }; use anyhow::{Context, Result}; -use risc0_zkvm::{ReceiptClaim, SuccinctReceipt, WorkClaim}; +use risc0_zkvm::{ReceiptClaim, SegmentReceipt, SuccinctReceipt, WorkClaim}; use uuid::Uuid; use workflow_common::JoinReq; @@ -29,14 +29,50 @@ pub async fn join_povw(agent: &Agent, job_id: &Uuid, request: &JoinReq) -> Resul format!("failed to get receipts for keys: {left_receipt_key}, {right_receipt_key}") })?; - // Deserialize POVW receipts + // Handle each receipt independently - they could be mixed types + let prover = agent.prover.as_ref().context("Missing prover from POVW join task")?; + let (left_receipt, right_receipt): ( SuccinctReceipt>, SuccinctReceipt>, - ) = ( - deserialize_obj::>>(&left_receipt_bytes)?, - deserialize_obj::>>(&right_receipt_bytes)?, - ); + ) = tokio::try_join!( + async { + match deserialize_obj::(&left_receipt_bytes) { + Ok(segment_receipt) => { + // Successfully deserialized as segment receipt, now lift to POVW + let povw_receipt = prover + .lift_povw(&segment_receipt) + .context("Failed to lift left segment to POVW")?; + Ok::<_, anyhow::Error>(povw_receipt) + } + Err(_) => { + // Failed to deserialize as segment, try as already-lifted POVW receipt + let povw_receipt: SuccinctReceipt> = + deserialize_obj(&left_receipt_bytes) + .context("Failed to deserialize left POVW receipt")?; + Ok(povw_receipt) + } + } + }, + async { + match deserialize_obj::(&right_receipt_bytes) { + Ok(segment_receipt) => { + // Successfully deserialized as segment receipt, now lift to POVW + let povw_receipt = prover + .lift_povw(&segment_receipt) + .context("Failed to lift right segment to POVW")?; + Ok::<_, anyhow::Error>(povw_receipt) + } + Err(_) => { + // Failed to deserialize as segment, try as already-lifted POVW receipt + let povw_receipt: SuccinctReceipt> = + deserialize_obj(&right_receipt_bytes) + .context("Failed to deserialize right POVW receipt")?; + Ok(povw_receipt) + } + } + } + )?; tracing::debug!("Starting POVW join of receipts {} and {}", request.left, request.right); diff --git a/bento/crates/workflow/src/tasks/prove.rs b/bento/crates/workflow/src/tasks/prove.rs index 5142f5287..397dbf836 100644 --- a/bento/crates/workflow/src/tasks/prove.rs +++ b/bento/crates/workflow/src/tasks/prove.rs @@ -9,7 +9,7 @@ use crate::{ tasks::{RECUR_RECEIPT_PATH, SEGMENTS_PATH, deserialize_obj, serialize_obj}, }; use anyhow::{Context, Result}; -use risc0_zkvm::{ReceiptClaim, SuccinctReceipt, WorkClaim}; +use risc0_zkvm::Segment; use uuid::Uuid; use workflow_common::ProveReq; @@ -25,52 +25,37 @@ pub async fn prover(agent: &Agent, job_id: &Uuid, task_id: &str, request: &Prove .get::<_, Vec>(&segment_key) .await .with_context(|| format!("segment data not found for segment key: {segment_key}"))?; - let segment = - deserialize_obj(&segment_vec).context("Failed to deserialize segment data from redis")?; - let segment_receipt = agent - .prover - .as_ref() - .context("Missing prover from prove task")? - .prove_segment(&agent.verifier_ctx, &segment) - .context("Failed to prove segment")?; + // Try to deserialize as segment first, then prove it + let segment_receipt = match deserialize_obj::(&segment_vec) { + Ok(segment) => { + // Successfully deserialized as segment, now prove it + agent + .prover + .as_ref() + .context("Missing prover from prove task")? + .prove_segment(&agent.verifier_ctx, &segment) + .context("Failed to prove segment")? + } + Err(_) => { + // Failed to deserialize as segment, try as already-proven receipt + deserialize_obj(&segment_vec) + .context("Failed to deserialize segment data from redis")? + } + }; tracing::debug!("Completed proof: {job_id} - {index}"); - - tracing::debug!("lifting {job_id} - {index}"); - let output_key = format!("{job_prefix}:{RECUR_RECEIPT_PATH}:{task_id}"); - if agent.is_povw_enabled() { - let lift_receipt: SuccinctReceipt> = agent - .prover - .as_ref() - .context("Missing prover from resolve task")? - .lift_povw(&segment_receipt) - .with_context(|| format!("Failed to POVW lift segment {index}"))?; - - tracing::debug!("lifting complete {job_id} - {index}"); - - // Write out lifted POVW receipt - let lift_asset = - serialize_obj(&lift_receipt).expect("Failed to serialize the POVW segment"); - redis::set_key_with_expiry(&mut conn, &output_key, lift_asset, Some(agent.args.redis_ttl)) - .await?; - } else { - let lift_receipt: SuccinctReceipt = agent - .prover - .as_ref() - .context("Missing prover from resolve task")? - .lift(&segment_receipt) - .with_context(|| format!("Failed to lift segment {index}"))?; - - tracing::debug!("lifting complete {job_id} - {index}"); - - // Write out lifted regular receipt - let lift_asset = serialize_obj(&lift_receipt).expect("Failed to serialize the segment"); - redis::set_key_with_expiry(&mut conn, &output_key, lift_asset, Some(agent.args.redis_ttl)) - .await?; - } + let serialized_receipt = + serialize_obj(&segment_receipt).context("Failed to serialize segment receipt")?; + redis::set_key_with_expiry( + &mut conn, + &output_key, + serialized_receipt, + Some(agent.args.redis_ttl), + ) + .await?; Ok(()) } diff --git a/bento/crates/workflow/src/tasks/resolve.rs b/bento/crates/workflow/src/tasks/resolve.rs index ad0316d09..5f548fcd9 100644 --- a/bento/crates/workflow/src/tasks/resolve.rs +++ b/bento/crates/workflow/src/tasks/resolve.rs @@ -10,7 +10,7 @@ use crate::{ }; use anyhow::{Context, Result}; use risc0_zkvm::sha::Digestible; -use risc0_zkvm::{ReceiptClaim, SuccinctReceipt, Unknown}; +use risc0_zkvm::{ReceiptClaim, SegmentReceipt, SuccinctReceipt, Unknown}; use uuid::Uuid; use workflow_common::{KECCAK_RECEIPT_PATH, ResolveReq}; @@ -29,76 +29,86 @@ pub async fn resolver(agent: &Agent, job_id: &Uuid, request: &ResolveReq) -> Res })?; tracing::debug!("Root receipt size: {} bytes", receipt.len()); - let mut conditional_receipt: SuccinctReceipt = deserialize_obj(&receipt)?; + + // Try to deserialize as segment receipt first, then lift it if needed + let mut conditional_receipt: SuccinctReceipt = + match deserialize_obj::(&receipt) { + Ok(segment_receipt) => { + // Successfully deserialized as segment receipt, now lift it + agent + .prover + .as_ref() + .context("Missing prover from resolve task")? + .lift(&segment_receipt) + .context("Failed to lift segment receipt")? + } + Err(_) => { + // Failed to deserialize as segment receipt, try as already-lifted succinct receipt + deserialize_obj(&receipt).context("Failed to deserialize receipt")? + } + }; let mut assumptions_len: Option = None; - if conditional_receipt.claim.clone().as_value()?.output.is_some() { - if let Some(guest_output) = + if conditional_receipt.claim.clone().as_value()?.output.is_some() + && let Some(guest_output) = conditional_receipt.claim.clone().as_value()?.output.as_value()? - { - if !guest_output.assumptions.is_empty() { - let assumptions = guest_output - .assumptions - .as_value() - .context("Failed unwrap the assumptions of the guest output")? - .iter(); - - tracing::debug!("Resolving {} assumption(s)", assumptions.len()); - assumptions_len = - Some(assumptions.len().try_into().context("Failed to convert to u64")?); - - let mut union_claim = String::new(); - if let Some(idx) = request.union_max_idx { - let union_root_receipt_key = - format!("{job_prefix}:{KECCAK_RECEIPT_PATH}:{idx}"); - tracing::debug!( - "Deserializing union_root_receipt_key: {union_root_receipt_key}" - ); - let union_receipt: Vec = conn.get(&union_root_receipt_key).await?; - let union_receipt: SuccinctReceipt = - deserialize_obj(&union_receipt) - .context("Failed to deserialize to SuccinctReceipt type")?; - union_claim = union_receipt.claim.digest().to_string(); - - // Resolve union receipt - tracing::debug!("Resolving union claim digest: {union_claim}"); - conditional_receipt = agent - .prover - .as_ref() - .context("Missing prover from resolve task")? - .resolve(&conditional_receipt, &union_receipt) - .context("Failed to resolve the union receipt")?; - } - - for assumption in assumptions { - let assumption_claim = assumption.as_value()?.claim.to_string(); - if assumption_claim.eq(&union_claim) { - tracing::debug!("Skipping already resolved union claim: {union_claim}"); - continue; - } - let assumption_key = format!("{receipts_key}:{assumption_claim}"); - tracing::debug!("Deserializing assumption with key: {assumption_key}"); - let assumption_bytes: Vec = conn - .get(&assumption_key) - .await - .context("corroborating receipt not found: key {assumption_key}")?; - - let assumption_receipt: SuccinctReceipt = - deserialize_obj(&assumption_bytes).with_context(|| { - format!("could not deserialize assumption receipt: {assumption_key}") - })?; - - // Resolve - conditional_receipt = agent - .prover - .as_ref() - .context("Missing prover from resolve task")? - .resolve(&conditional_receipt, &assumption_receipt) - .context("Failed to resolve the conditional receipt")?; - } - tracing::debug!("Resolve complete for job_id: {job_id}"); + && !guest_output.assumptions.is_empty() + { + let assumptions = guest_output + .assumptions + .as_value() + .context("Failed unwrap the assumptions of the guest output")? + .iter(); + + tracing::debug!("Resolving {} assumption(s)", assumptions.len()); + assumptions_len = Some(assumptions.len().try_into().context("Failed to convert to u64")?); + + let mut union_claim = String::new(); + if let Some(idx) = request.union_max_idx { + let union_root_receipt_key = format!("{job_prefix}:{KECCAK_RECEIPT_PATH}:{idx}"); + tracing::debug!("Deserializing union_root_receipt_key: {union_root_receipt_key}"); + let union_receipt: Vec = conn.get(&union_root_receipt_key).await?; + let union_receipt: SuccinctReceipt = deserialize_obj(&union_receipt) + .context("Failed to deserialize to SuccinctReceipt type")?; + union_claim = union_receipt.claim.digest().to_string(); + + // Resolve union receipt + tracing::debug!("Resolving union claim digest: {union_claim}"); + conditional_receipt = agent + .prover + .as_ref() + .context("Missing prover from resolve task")? + .resolve(&conditional_receipt, &union_receipt) + .context("Failed to resolve the union receipt")?; + } + + for assumption in assumptions { + let assumption_claim = assumption.as_value()?.claim.to_string(); + if assumption_claim.eq(&union_claim) { + tracing::debug!("Skipping already resolved union claim: {union_claim}"); + continue; } + let assumption_key = format!("{receipts_key}:{assumption_claim}"); + tracing::debug!("Deserializing assumption with key: {assumption_key}"); + let assumption_bytes: Vec = conn + .get(&assumption_key) + .await + .context("corroborating receipt not found: key {assumption_key}")?; + + let assumption_receipt: SuccinctReceipt = deserialize_obj(&assumption_bytes) + .with_context(|| { + format!("could not deserialize assumption receipt: {assumption_key}") + })?; + + // Resolve + conditional_receipt = agent + .prover + .as_ref() + .context("Missing prover from resolve task")? + .resolve(&conditional_receipt, &assumption_receipt) + .context("Failed to resolve the conditional receipt")?; } + tracing::debug!("Resolve complete for job_id: {job_id}"); } // Write out the resolved receipt diff --git a/bento/crates/workflow/src/tasks/resolve_povw.rs b/bento/crates/workflow/src/tasks/resolve_povw.rs index 03c8189cd..37a21c83c 100644 --- a/bento/crates/workflow/src/tasks/resolve_povw.rs +++ b/bento/crates/workflow/src/tasks/resolve_povw.rs @@ -10,7 +10,9 @@ use crate::{ }; use anyhow::{Context, Result}; use risc0_zkvm::sha::Digestible; -use risc0_zkvm::{GenericReceipt, ReceiptClaim, SuccinctReceipt, Unknown, WorkClaim}; +use risc0_zkvm::{ + GenericReceipt, ReceiptClaim, SegmentReceipt, SuccinctReceipt, Unknown, WorkClaim, +}; use uuid::Uuid; use workflow_common::{KECCAK_RECEIPT_PATH, ResolveReq, s3::WORK_RECEIPTS_BUCKET_DIR}; @@ -33,110 +35,122 @@ pub async fn resolve_povw( tracing::debug!("Root receipt size: {} bytes", receipt.len()); - // Deserialize as POVW receipt + // Try to deserialize as segment receipt first, then lift it to POVW if needed let povw_receipt: SuccinctReceipt> = - deserialize_obj::>>(&receipt) - .context("Failed to deserialize as POVW receipt")?; + match deserialize_obj::(&receipt) { + Ok(segment_receipt) => { + // Successfully deserialized as segment receipt, now lift it to POVW + agent + .prover + .as_ref() + .context("Missing prover from POVW resolve task")? + .lift_povw(&segment_receipt) + .context("Failed to lift segment receipt to POVW")? + } + Err(_) => { + // Failed to deserialize as segment receipt, try as already-lifted POVW receipt + deserialize_obj::>>(&receipt) + .context("Failed to deserialize as POVW receipt")? + } + }; // Unwrap the POVW receipt to get the ReceiptClaim for processing let mut conditional_receipt: SuccinctReceipt = agent.prover.as_ref().unwrap().unwrap_povw(&povw_receipt).context("POVW unwrap failed")?; let mut assumptions_len: Option = None; - if conditional_receipt.claim.clone().as_value()?.output.is_some() { - if let Some(guest_output) = + if conditional_receipt.claim.clone().as_value()?.output.is_some() + && let Some(guest_output) = conditional_receipt.claim.clone().as_value()?.output.as_value()? - { - if !guest_output.assumptions.is_empty() { - let assumptions = guest_output - .assumptions - .as_value() - .context("Failed unwrap the assumptions of the guest output")? - .iter(); - - tracing::debug!("Resolving {} assumption(s)", assumptions.len()); - assumptions_len = - Some(assumptions.len().try_into().context("Failed to convert to u64")?); - - let mut union_claim = String::new(); - if let Some(idx) = request.union_max_idx { - let union_root_receipt_key = - format!("{job_prefix}:{KECCAK_RECEIPT_PATH}:{idx}"); - tracing::debug!( - "Deserializing union_root_receipt_key: {union_root_receipt_key}" - ); - let union_receipt: Vec = conn.get(&union_root_receipt_key).await?; - - // Debug: Check the size and content of the union receipt - tracing::debug!("Union receipt size: {} bytes", union_receipt.len()); - if union_receipt.is_empty() { - return Err(anyhow::anyhow!( - "Union receipt is empty for key: {}", - union_root_receipt_key - )); - } - - let union_receipt: SuccinctReceipt = deserialize_obj(&union_receipt) - .with_context(|| { - format!( - "Failed to deserialize union receipt (size: {} bytes) from key: {}", - union_receipt.len(), - union_root_receipt_key - ) - })?; - union_claim = union_receipt.claim.digest().to_string(); - - // Resolve union receipt - tracing::debug!("Resolving union claim digest: {union_claim}"); - conditional_receipt = agent - .prover - .as_ref() - .context("Missing prover from resolve task")? - .resolve(&conditional_receipt, &union_receipt) - .context("Failed to resolve the union receipt")?; - } - - for assumption in assumptions { - let assumption_claim = assumption.as_value()?.claim.to_string(); - if assumption_claim.eq(&union_claim) { - tracing::debug!("Skipping already resolved union claim: {union_claim}"); - continue; - } - let assumption_key = - format!("{job_prefix}:{RECUR_RECEIPT_PATH}:{assumption_claim}"); - tracing::debug!("Deserializing assumption with key: {assumption_key}"); - let assumption_bytes: Vec = conn - .get(&assumption_key) - .await - .context("corroborating receipt not found: key {assumption_key}")?; - - // Debug: Check the size and content of the assumption receipt - tracing::debug!( - "Assumption receipt size: {} bytes for key: {}", - assumption_bytes.len(), - assumption_key - ); - if assumption_bytes.is_empty() { - return Err(anyhow::anyhow!( - "Assumption receipt is empty for key: {}", - assumption_key - )); - } - - let assumption_receipt = deserialize_obj(&assumption_bytes) - .with_context(|| format!("Failed to deserialize assumption receipt (size: {} bytes) from key: {}", assumption_bytes.len(), assumption_key))?; - - // Resolve - conditional_receipt = agent - .prover - .as_ref() - .context("Missing prover from resolve task")? - .resolve(&conditional_receipt, &assumption_receipt) - .context("Failed to resolve the conditional receipt")?; - } - tracing::debug!("Resolve complete for job_id: {job_id}"); + && !guest_output.assumptions.is_empty() + { + let assumptions = guest_output + .assumptions + .as_value() + .context("Failed unwrap the assumptions of the guest output")? + .iter(); + + tracing::debug!("Resolving {} assumption(s)", assumptions.len()); + assumptions_len = Some(assumptions.len().try_into().context("Failed to convert to u64")?); + + let mut union_claim = String::new(); + if let Some(idx) = request.union_max_idx { + let union_root_receipt_key = format!("{job_prefix}:{KECCAK_RECEIPT_PATH}:{idx}"); + tracing::debug!("Deserializing union_root_receipt_key: {union_root_receipt_key}"); + let union_receipt: Vec = conn.get(&union_root_receipt_key).await?; + + // Debug: Check the size and content of the union receipt + tracing::debug!("Union receipt size: {} bytes", union_receipt.len()); + if union_receipt.is_empty() { + return Err(anyhow::anyhow!( + "Union receipt is empty for key: {}", + union_root_receipt_key + )); + } + + let union_receipt: SuccinctReceipt = deserialize_obj(&union_receipt) + .with_context(|| { + format!( + "Failed to deserialize union receipt (size: {} bytes) from key: {}", + union_receipt.len(), + union_root_receipt_key + ) + })?; + union_claim = union_receipt.claim.digest().to_string(); + + // Resolve union receipt + tracing::debug!("Resolving union claim digest: {union_claim}"); + conditional_receipt = agent + .prover + .as_ref() + .context("Missing prover from resolve task")? + .resolve(&conditional_receipt, &union_receipt) + .context("Failed to resolve the union receipt")?; + } + + for assumption in assumptions { + let assumption_claim = assumption.as_value()?.claim.to_string(); + if assumption_claim.eq(&union_claim) { + tracing::debug!("Skipping already resolved union claim: {union_claim}"); + continue; + } + let assumption_key = format!("{job_prefix}:{RECUR_RECEIPT_PATH}:{assumption_claim}"); + tracing::debug!("Deserializing assumption with key: {assumption_key}"); + let assumption_bytes: Vec = conn + .get(&assumption_key) + .await + .context("corroborating receipt not found: key {assumption_key}")?; + + // Debug: Check the size and content of the assumption receipt + tracing::debug!( + "Assumption receipt size: {} bytes for key: {}", + assumption_bytes.len(), + assumption_key + ); + if assumption_bytes.is_empty() { + return Err(anyhow::anyhow!( + "Assumption receipt is empty for key: {}", + assumption_key + )); } + + let assumption_receipt = deserialize_obj(&assumption_bytes).with_context(|| { + format!( + "Failed to deserialize assumption receipt (size: {} bytes) from key: {}", + assumption_bytes.len(), + assumption_key + ) + })?; + + // Resolve + conditional_receipt = agent + .prover + .as_ref() + .context("Missing prover from resolve task")? + .resolve(&conditional_receipt, &assumption_receipt) + .context("Failed to resolve the conditional receipt")?; } + tracing::debug!("Resolve complete for job_id: {job_id}"); } // Write out the resolved receipt @@ -174,6 +188,7 @@ pub async fn resolve_povw( // Only include POVW fields if they are actually set and non-empty let mut metadata_fields = serde_json::Map::new(); metadata_fields.insert("job_id".to_string(), serde_json::Value::String(job_id.to_string())); + if let Ok(log_id) = std::env::var("POVW_LOG_ID") { metadata_fields.insert("povw_log_id".to_string(), serde_json::Value::String(log_id)); }