Skip to content

Commit

Permalink
Propagate user app panics to SF for async apis (#100)
Browse files Browse the repository at this point in the history
Previously user app code panics in the SF callback/notification code
causes the process to exit.
This PR changes this behavior: for all SF async api notifications with
Begin and End calls, mssf provides user an async function to implement.
If user function panics, mssf considers the callback/notification failed
with error code E_UNEXPECTED and returns this error code to SF.
For synchronous apis that user implements, the panic is unhandled and
will cause process to terminate, because mssf currently does not do
anything for synchronous SF calls, user app code is directly executed on
SF threads.
  • Loading branch information
youyuanwu authored Dec 2, 2024
1 parent a0665c1 commit c19c2ef
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 32 deletions.
76 changes: 51 additions & 25 deletions crates/libs/core/src/runtime/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,31 @@ use std::future::Future;
use tokio::{runtime::Handle, sync::mpsc::channel};
use tracing::info;

use crate::error::FabricErrorCode;

// Executor is used by rs to post jobs to execute in the background
// Sync is needed due to we use the executor across await boundary.
pub trait Executor: Clone + Sync + Send + 'static {
// Required functions

// spawns the task to run in background
fn spawn<F>(&self, future: F)
/// spawns the task to run in background, and returns a join handle
/// where the future's result can be awaited.
/// If the future panics, the join handle should return an error code.
/// This is primarily used by mssf Bridge to execute user app async callbacks/notifications.
/// User app impl future may panic, and mssf propagates panic as an error in JoinHandle
/// to SF.
fn spawn<F>(&self, future: F) -> impl JoinHandle<F::Output>
where
F: Future + Send + 'static;
F: Future + Send + 'static,
F::Output: Send;

// run the future on the executor until completion.
/// run the future on the executor until completion.
fn block_on<F: Future>(&self, future: F) -> F::Output;

// provided functions

// Run the executor and block the current thread until ctrl-c event is
// Received.
/// Run the executor and block the current thread until ctrl-c event is
/// Received.
fn run_until_ctrl_c(&self) {
info!("DefaultExecutor: setting up ctrl-c event.");
// set ctrc event
Expand All @@ -44,47 +52,65 @@ pub trait Executor: Clone + Sync + Send + 'static {
}
}

/// Handle can be awaited to get the success status of the task.
/// The handle is primarily needed to propagate background task error
/// back to SF.
#[trait_variant::make(JoinHandle: Send)]
pub trait LocalJoinHandle<T> {
async fn join(self) -> crate::Result<T>;
}

#[derive(Clone)]
pub struct DefaultExecutor {
rt: Handle,
}

/// Default implementation of the JoinHandle using tokio
pub struct DefaultJoinHandle<T> {
inner: tokio::task::JoinHandle<T>,
}

impl DefaultExecutor {
pub fn new(rt: Handle) -> DefaultExecutor {
DefaultExecutor { rt }
}
}

impl Executor for DefaultExecutor {
fn spawn<F>(&self, future: F)
fn spawn<F>(&self, future: F) -> impl JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send,
{
let h = self.rt.spawn(async move {
future.await;
});

// Monitor user future.
// If user task has panic, exit the process.
// TODO: expose a config to control this behavior.
// It is observed that if user task panics, sf operation are stuck.
self.rt.spawn(async move {
let ok = h.await;
if ok.is_err() {
info!(
"DefaultExecutor: User spawned future paniced {}",
ok.unwrap_err()
);
std::process::exit(1);
}
});
let h = self.rt.spawn(future);
DefaultJoinHandle::<F::Output> { inner: h }
}

fn block_on<F: Future>(&self, future: F) -> F::Output {
self.rt.block_on(future)
}
}

impl<T: Send> JoinHandle<T> for DefaultJoinHandle<T> {
async fn join(self) -> crate::Result<T> {
match self.inner.await {
Ok(x) => Ok(x),
Err(e) => {
let e = if e.is_cancelled() {
// we never cancel in executor
FabricErrorCode::E_ABORT
} else if e.is_panic() {
FabricErrorCode::E_UNEXPECTED
} else {
FabricErrorCode::E_FAIL
};
tracing::error!("DefaultJoinHandle: background task failed: {e}");
Err(e.into())
}
}
}
}

#[cfg(test)]
mod test {
use super::DefaultExecutor;
Expand Down
62 changes: 55 additions & 7 deletions crates/libs/core/src/sync/cancel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,20 @@ use mssf_com::FabricCommon::{
pub use tokio_util::sync::CancellationToken;
use windows_core::{implement, AsImpl};

use crate::{error::FabricErrorCode, runtime::executor::Executor};
use crate::{
error::FabricErrorCode,
runtime::executor::{Executor, JoinHandle},
};

/// Async operation context for bridging rust code into SF COM api that supports cancellation.
#[implement(IFabricAsyncOperationContext)]
pub struct BridgeContext3<T>
where
T: 'static,
{
content: Cell<Option<T>>,
/// The task result. Initially it is None.
/// If the task panics, the error is propagated here.
content: Cell<Option<crate::Result<T>>>,
/// Indicates the async operation has completed or not.
/// This is a memory barrier for making the content available
/// from writer thread to the reader thread. It is needed because
Expand Down Expand Up @@ -66,6 +71,8 @@ where
/// This is intended to be used in SF Begin COM api, where
/// rust code is spawned in background and the context is returned
/// to caller.
/// If the future panics, an error is set in the resulting content,
/// caller will still get callback and receive an error in the End api.
/// This api is in some sense unsafe, because the developer needs to ensure
/// the following:
/// * return type of the future needs to match SF COM api end return type.
Expand All @@ -79,10 +86,16 @@ where
{
let self_cp: IFabricAsyncOperationContext = self.into();
let self_cp2 = self_cp.clone();
let rt_cp = rt.clone();
rt.spawn(async move {
let ok = future.await;
// Run user code in a task and wait on its status.
// If user code panics we propagate the error back to SF.
let task_res = rt_cp.spawn(future).join().await;
// TODO: maybe it is good to report health to SF here the same way that sf dotnet app works.

// We trust the code in mssf here to not panic, or we have bigger problem (memory corruption etc.).
let self_impl: &BridgeContext3<T> = unsafe { self_cp.as_impl() };
self_impl.set_content(ok);
self_impl.set_content(task_res);
let cb = self_impl.Callback().unwrap();
unsafe { cb.Invoke(&self_cp) };
});
Expand All @@ -103,7 +116,7 @@ where

/// Set the content for the ctx.
/// Marks the ctx as completed.
fn set_content(&self, content: T) {
fn set_content(&self, content: crate::Result<T>) {
let prev = self.content.replace(Some(content));
assert!(prev.is_none());
self.set_complete();
Expand All @@ -113,7 +126,7 @@ where
/// can only be called once after set content.
fn consume_content(&self) -> crate::Result<T> {
match self.check_complete() {
true => Ok(self.content.take().expect("content is consumed twice.")),
true => self.content.take().expect("content is consumed twice."),
false => {
if self.token.is_cancelled() {
Err(FabricErrorCode::E_ABORT.into())
Expand Down Expand Up @@ -395,7 +408,7 @@ where
mod test {
use std::{
cell::Cell,
sync::{Arc, Mutex},
sync::{atomic::AtomicBool, Arc, Mutex},
time::Duration,
};

Expand Down Expand Up @@ -483,6 +496,7 @@ mod test {
/// Test Obj for cancellation
pub struct MyObj {
data: Mutex<Cell<String>>,
panic: AtomicBool,
}

// Implement the test trait
Expand All @@ -493,6 +507,9 @@ mod test {
ignore_cancel: bool,
token: Option<CancellationToken>,
) -> crate::Result<String> {
if self.panic.load(std::sync::atomic::Ordering::Relaxed) {
panic!("test panic is set")
}
if delay.is_zero() {
// This is needed to make future is breakable in bench test in select
tokio::task::yield_now().await;
Expand Down Expand Up @@ -524,6 +541,9 @@ mod test {
delay: Duration,
token: Option<CancellationToken>,
) -> crate::Result<()> {
if self.panic.load(std::sync::atomic::Ordering::Relaxed) {
panic!("test panic is set")
}
if delay.is_zero() {
// This is needed to make future is breakable in bench test in select
tokio::task::yield_now().await;
Expand Down Expand Up @@ -553,6 +573,7 @@ mod test {
pub fn new(data: String) -> Self {
Self {
data: Mutex::new(Cell::new(data)),
panic: AtomicBool::new(false),
}
}

Expand Down Expand Up @@ -841,4 +862,31 @@ mod test {
join.await.unwrap();
count
}

#[tokio::test]
async fn test_user_code_panic() {
let h = tokio::runtime::Handle::current();
let expected_data1 = "mydata1";
let inner = MyObj::new(expected_data1.to_string());
let proxy = MyObjProxy::new(h.clone(), inner);
{
let out = IMyObj::get_data_delay(&proxy, Duration::ZERO, false, None)
.await
.expect("fail to get data");
assert_eq!(out, expected_data1);
}
// enable panic for the user code
// check the panic is converted to correct error code.
proxy
.com
.inner
.panic
.store(true, std::sync::atomic::Ordering::Relaxed);
{
let out = IMyObj::get_data_delay(&proxy, Duration::ZERO, false, None)
.await
.expect_err("should error out");
assert_eq!(out, FabricErrorCode::E_UNEXPECTED.into());
}
}
}

0 comments on commit c19c2ef

Please sign in to comment.