diff --git a/crates/libs/core/src/runtime/executor.rs b/crates/libs/core/src/runtime/executor.rs index 1b76d1ac..66c807a2 100644 --- a/crates/libs/core/src/runtime/executor.rs +++ b/crates/libs/core/src/runtime/executor.rs @@ -8,23 +8,31 @@ use std::future::Future; use tokio::{runtime::Handle, sync::mpsc::channel}; use tracing::info; +use crate::error::FabricErrorCode; + // Executor is used by rs to post jobs to execute in the background // Sync is needed due to we use the executor across await boundary. pub trait Executor: Clone + Sync + Send + 'static { // Required functions - // spawns the task to run in background - fn spawn(&self, future: F) + /// spawns the task to run in background, and returns a join handle + /// where the future's result can be awaited. + /// If the future panics, the join handle should return an error code. + /// This is primarily used by mssf Bridge to execute user app async callbacks/notifications. + /// User app impl future may panic, and mssf propagates panic as an error in JoinHandle + /// to SF. + fn spawn(&self, future: F) -> impl JoinHandle where - F: Future + Send + 'static; + F: Future + Send + 'static, + F::Output: Send; - // run the future on the executor until completion. + /// run the future on the executor until completion. fn block_on(&self, future: F) -> F::Output; // provided functions - // Run the executor and block the current thread until ctrl-c event is - // Received. + /// Run the executor and block the current thread until ctrl-c event is + /// Received. fn run_until_ctrl_c(&self) { info!("DefaultExecutor: setting up ctrl-c event."); // set ctrc event @@ -44,11 +52,24 @@ pub trait Executor: Clone + Sync + Send + 'static { } } +/// Handle can be awaited to get the success status of the task. +/// The handle is primarily needed to propagate background task error +/// back to SF. +#[trait_variant::make(JoinHandle: Send)] +pub trait LocalJoinHandle { + async fn join(self) -> crate::Result; +} + #[derive(Clone)] pub struct DefaultExecutor { rt: Handle, } +/// Default implementation of the JoinHandle using tokio +pub struct DefaultJoinHandle { + inner: tokio::task::JoinHandle, +} + impl DefaultExecutor { pub fn new(rt: Handle) -> DefaultExecutor { DefaultExecutor { rt } @@ -56,28 +77,13 @@ impl DefaultExecutor { } impl Executor for DefaultExecutor { - fn spawn(&self, future: F) + fn spawn(&self, future: F) -> impl JoinHandle where F: Future + Send + 'static, + F::Output: Send, { - let h = self.rt.spawn(async move { - future.await; - }); - - // Monitor user future. - // If user task has panic, exit the process. - // TODO: expose a config to control this behavior. - // It is observed that if user task panics, sf operation are stuck. - self.rt.spawn(async move { - let ok = h.await; - if ok.is_err() { - info!( - "DefaultExecutor: User spawned future paniced {}", - ok.unwrap_err() - ); - std::process::exit(1); - } - }); + let h = self.rt.spawn(future); + DefaultJoinHandle:: { inner: h } } fn block_on(&self, future: F) -> F::Output { @@ -85,6 +91,26 @@ impl Executor for DefaultExecutor { } } +impl JoinHandle for DefaultJoinHandle { + async fn join(self) -> crate::Result { + match self.inner.await { + Ok(x) => Ok(x), + Err(e) => { + let e = if e.is_cancelled() { + // we never cancel in executor + FabricErrorCode::E_ABORT + } else if e.is_panic() { + FabricErrorCode::E_UNEXPECTED + } else { + FabricErrorCode::E_FAIL + }; + tracing::error!("DefaultJoinHandle: background task failed: {e}"); + Err(e.into()) + } + } + } +} + #[cfg(test)] mod test { use super::DefaultExecutor; diff --git a/crates/libs/core/src/sync/cancel.rs b/crates/libs/core/src/sync/cancel.rs index cd5495b2..1ba66a69 100644 --- a/crates/libs/core/src/sync/cancel.rs +++ b/crates/libs/core/src/sync/cancel.rs @@ -16,7 +16,10 @@ use mssf_com::FabricCommon::{ pub use tokio_util::sync::CancellationToken; use windows_core::{implement, AsImpl}; -use crate::{error::FabricErrorCode, runtime::executor::Executor}; +use crate::{ + error::FabricErrorCode, + runtime::executor::{Executor, JoinHandle}, +}; /// Async operation context for bridging rust code into SF COM api that supports cancellation. #[implement(IFabricAsyncOperationContext)] @@ -24,7 +27,9 @@ pub struct BridgeContext3 where T: 'static, { - content: Cell>, + /// The task result. Initially it is None. + /// If the task panics, the error is propagated here. + content: Cell>>, /// Indicates the async operation has completed or not. /// This is a memory barrier for making the content available /// from writer thread to the reader thread. It is needed because @@ -66,6 +71,8 @@ where /// This is intended to be used in SF Begin COM api, where /// rust code is spawned in background and the context is returned /// to caller. + /// If the future panics, an error is set in the resulting content, + /// caller will still get callback and receive an error in the End api. /// This api is in some sense unsafe, because the developer needs to ensure /// the following: /// * return type of the future needs to match SF COM api end return type. @@ -79,10 +86,16 @@ where { let self_cp: IFabricAsyncOperationContext = self.into(); let self_cp2 = self_cp.clone(); + let rt_cp = rt.clone(); rt.spawn(async move { - let ok = future.await; + // Run user code in a task and wait on its status. + // If user code panics we propagate the error back to SF. + let task_res = rt_cp.spawn(future).join().await; + // TODO: maybe it is good to report health to SF here the same way that sf dotnet app works. + + // We trust the code in mssf here to not panic, or we have bigger problem (memory corruption etc.). let self_impl: &BridgeContext3 = unsafe { self_cp.as_impl() }; - self_impl.set_content(ok); + self_impl.set_content(task_res); let cb = self_impl.Callback().unwrap(); unsafe { cb.Invoke(&self_cp) }; }); @@ -103,7 +116,7 @@ where /// Set the content for the ctx. /// Marks the ctx as completed. - fn set_content(&self, content: T) { + fn set_content(&self, content: crate::Result) { let prev = self.content.replace(Some(content)); assert!(prev.is_none()); self.set_complete(); @@ -113,7 +126,7 @@ where /// can only be called once after set content. fn consume_content(&self) -> crate::Result { match self.check_complete() { - true => Ok(self.content.take().expect("content is consumed twice.")), + true => self.content.take().expect("content is consumed twice."), false => { if self.token.is_cancelled() { Err(FabricErrorCode::E_ABORT.into()) @@ -395,7 +408,7 @@ where mod test { use std::{ cell::Cell, - sync::{Arc, Mutex}, + sync::{atomic::AtomicBool, Arc, Mutex}, time::Duration, }; @@ -483,6 +496,7 @@ mod test { /// Test Obj for cancellation pub struct MyObj { data: Mutex>, + panic: AtomicBool, } // Implement the test trait @@ -493,6 +507,9 @@ mod test { ignore_cancel: bool, token: Option, ) -> crate::Result { + if self.panic.load(std::sync::atomic::Ordering::Relaxed) { + panic!("test panic is set") + } if delay.is_zero() { // This is needed to make future is breakable in bench test in select tokio::task::yield_now().await; @@ -524,6 +541,9 @@ mod test { delay: Duration, token: Option, ) -> crate::Result<()> { + if self.panic.load(std::sync::atomic::Ordering::Relaxed) { + panic!("test panic is set") + } if delay.is_zero() { // This is needed to make future is breakable in bench test in select tokio::task::yield_now().await; @@ -553,6 +573,7 @@ mod test { pub fn new(data: String) -> Self { Self { data: Mutex::new(Cell::new(data)), + panic: AtomicBool::new(false), } } @@ -841,4 +862,31 @@ mod test { join.await.unwrap(); count } + + #[tokio::test] + async fn test_user_code_panic() { + let h = tokio::runtime::Handle::current(); + let expected_data1 = "mydata1"; + let inner = MyObj::new(expected_data1.to_string()); + let proxy = MyObjProxy::new(h.clone(), inner); + { + let out = IMyObj::get_data_delay(&proxy, Duration::ZERO, false, None) + .await + .expect("fail to get data"); + assert_eq!(out, expected_data1); + } + // enable panic for the user code + // check the panic is converted to correct error code. + proxy + .com + .inner + .panic + .store(true, std::sync::atomic::Ordering::Relaxed); + { + let out = IMyObj::get_data_delay(&proxy, Duration::ZERO, false, None) + .await + .expect_err("should error out"); + assert_eq!(out, FabricErrorCode::E_UNEXPECTED.into()); + } + } }