Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure we can reach the user's requested FVM concurrency #449

Merged
merged 2 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,14 @@ serde = "1.0.117"
serde_tuple = "0.5"
safer-ffi = { version = "0.0.7", features = ["proc_macros"] }
filecoin-proofs-api = { version = "16.1", default-features = false }
yastl = "0.1.2"

[dev-dependencies]
memmap2 = "0.5"
tempfile = "3.0.8"

[features]
default = ["cuda", "multicore-sdr" ]
default = ["cuda", "multicore-sdr"]
blst-portable = ["bls-signatures/blst-portable", "blstrs/portable"]
cuda = ["filecoin-proofs-api/cuda", "rust-gpu-tools/cuda", "fvm2/cuda", "fvm3/cuda", "fvm4/cuda"]
cuda-supraseal = ["filecoin-proofs-api/cuda-supraseal", "rust-gpu-tools/cuda", "fvm3/cuda-supraseal", "fvm4/cuda-supraseal"]
Expand Down
73 changes: 10 additions & 63 deletions rust/src/fvm/engine.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::collections::HashMap;
use std::ops::RangeInclusive;
use std::sync::{Arc, Mutex};

use anyhow::anyhow;
Expand All @@ -18,7 +17,7 @@ use super::externs::CgoExterns;
use super::types::*;

// Generic executor; uses the current (v3) engine types
pub trait CgoExecutor {
pub trait CgoExecutor: Send {
fn execute_message(
&mut self,
msg: Message,
Expand Down Expand Up @@ -65,9 +64,6 @@ pub struct MultiEngineContainer {
engines: Mutex<HashMap<EngineVersion, Arc<dyn AbstractMultiEngine + 'static>>>,
}

const LOTUS_FVM_CONCURRENCY_ENV_NAME: &str = "LOTUS_FVM_CONCURRENCY";
const VALID_CONCURRENCY_RANGE: RangeInclusive<u32> = 1..=128;

impl TryFrom<u32> for EngineVersion {
type Error = anyhow::Error;
fn try_from(value: u32) -> Result<Self, Self::Error> {
Expand All @@ -81,41 +77,6 @@ impl TryFrom<u32> for EngineVersion {
}

impl MultiEngineContainer {
/// Constructs a new multi-engine container with the default concurrency (4).
pub fn new() -> MultiEngineContainer {
Self::with_concurrency(4)
}

/// Constructs a new multi-engine container with the concurrency specified in the
/// `LOTUS_FVM_CONCURRENCY` environment variable.
pub fn new_env() -> MultiEngineContainer {
let valosstr = match std::env::var_os(LOTUS_FVM_CONCURRENCY_ENV_NAME) {
Some(v) => v,
None => return Self::new(),
};
let valstr = match valosstr.to_str() {
Some(s) => s,
None => {
log::error!("{LOTUS_FVM_CONCURRENCY_ENV_NAME} has invalid value");
return Self::new();
}
};
let concurrency: u32 = match valstr.parse() {
Ok(v) => v,
Err(e) => {
log::error!("{LOTUS_FVM_CONCURRENCY_ENV_NAME} has invalid value: {e}");
return Self::new();
}
};
if !VALID_CONCURRENCY_RANGE.contains(&concurrency) {
log::error!(
"{LOTUS_FVM_CONCURRENCY_ENV_NAME} must be in the range {VALID_CONCURRENCY_RANGE:?}, not {concurrency}"
);
return Self::new();
}
Self::with_concurrency(concurrency)
}

pub fn with_concurrency(concurrency: u32) -> MultiEngineContainer {
MultiEngineContainer {
engines: Mutex::new(HashMap::new()),
Expand Down Expand Up @@ -146,12 +107,6 @@ impl MultiEngineContainer {
}
}

impl Default for MultiEngineContainer {
fn default() -> MultiEngineContainer {
MultiEngineContainer::new()
}
}

// fvm v4 implementation
mod v4 {
use anyhow::anyhow;
Expand All @@ -160,10 +115,7 @@ mod v4 {

use fvm4::call_manager::DefaultCallManager as DefaultCallManager4;
use fvm4::engine::{EnginePool as EnginePool4, MultiEngine as MultiEngine4};
use fvm4::executor::{
ApplyKind, ApplyRet, DefaultExecutor as DefaultExecutor4,
ThreadedExecutor as ThreadedExecutor4,
};
use fvm4::executor::{ApplyKind, ApplyRet, DefaultExecutor as DefaultExecutor4};
use fvm4::kernel::filecoin::DefaultFilecoinKernel as DefaultFilecoinKernel4;
use fvm4::machine::{DefaultMachine as DefaultMachine4, NetworkConfig};
use fvm4_shared::{chainid::ChainID, clock::ChainEpoch, message::Message};
Expand All @@ -175,14 +127,13 @@ mod v4 {
use super::Config;

type CgoMachine4 = DefaultMachine4<CgoBlockstore, CgoExterns>;
type BaseExecutor4 = DefaultExecutor4<DefaultFilecoinKernel4<DefaultCallManager4<CgoMachine4>>>;
type CgoExecutor4 = ThreadedExecutor4<BaseExecutor4>;
type CgoExecutor4 = DefaultExecutor4<DefaultFilecoinKernel4<DefaultCallManager4<CgoMachine4>>>;

fn new_executor(
engine_pool: EnginePool4,
machine: CgoMachine4,
) -> anyhow::Result<CgoExecutor4> {
Ok(ThreadedExecutor4(BaseExecutor4::new(engine_pool, machine)?))
CgoExecutor4::new(engine_pool, machine)
}

impl CgoExecutor for CgoExecutor4 {
Expand Down Expand Up @@ -254,8 +205,7 @@ mod v3 {
};
use fvm3::engine::{EnginePool as EnginePool3, MultiEngine as MultiEngine3};
use fvm3::executor::{
ApplyFailure as ApplyFailure3, ApplyKind as ApplyKind3,
DefaultExecutor as DefaultExecutor3, ThreadedExecutor as ThreadedExecutor3,
ApplyFailure as ApplyFailure3, ApplyKind as ApplyKind3, DefaultExecutor as DefaultExecutor3,
};
use fvm3::machine::{DefaultMachine as DefaultMachine3, NetworkConfig as NetworkConfig3};
use fvm3::trace::ExecutionEvent as ExecutionEvent3;
Expand Down Expand Up @@ -284,14 +234,13 @@ mod v3 {
use super::Config;

type CgoMachine3 = DefaultMachine3<CgoBlockstore, CgoExterns>;
type BaseExecutor3 = DefaultExecutor3<DefaultKernel3<DefaultCallManager3<CgoMachine3>>>;
type CgoExecutor3 = ThreadedExecutor3<BaseExecutor3>;
type CgoExecutor3 = DefaultExecutor3<DefaultKernel3<DefaultCallManager3<CgoMachine3>>>;

fn new_executor(
engine_pool: EnginePool3,
machine: CgoMachine3,
) -> anyhow::Result<CgoExecutor3> {
Ok(ThreadedExecutor3(BaseExecutor3::new(engine_pool, machine)?))
CgoExecutor3::new(engine_pool, machine)
}

impl CgoExecutor for CgoExecutor3 {
Expand Down Expand Up @@ -533,8 +482,7 @@ mod v2 {
backtrace::Cause as Cause2, DefaultCallManager as DefaultCallManager2,
};
use fvm2::executor::{
ApplyFailure as ApplyFailure2, ApplyKind as ApplyKind2,
DefaultExecutor as DefaultExecutor2, ThreadedExecutor as ThreadedExecutor2,
ApplyFailure as ApplyFailure2, ApplyKind as ApplyKind2, DefaultExecutor as DefaultExecutor2,
};
use fvm2::machine::{
DefaultMachine as DefaultMachine2, MultiEngine as MultiEngine2,
Expand Down Expand Up @@ -565,11 +513,10 @@ mod v2 {
use super::Config;

type CgoMachine2 = DefaultMachine2<CgoBlockstore, CgoExterns>;
type BaseExecutor2 = DefaultExecutor2<DefaultKernel2<DefaultCallManager2<CgoMachine2>>>;
type CgoExecutor2 = ThreadedExecutor2<BaseExecutor2>;
type CgoExecutor2 = DefaultExecutor2<DefaultKernel2<DefaultCallManager2<CgoMachine2>>>;

fn new_executor(machine: CgoMachine2) -> CgoExecutor2 {
ThreadedExecutor2(BaseExecutor2::new(machine))
CgoExecutor2::new(machine)
}

fn bytes_to_block(bytes: RawBytes) -> Option<IpldBlock> {
Expand Down
69 changes: 67 additions & 2 deletions rust/src/fvm/machine.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::borrow::Cow;
use std::convert::{TryFrom, TryInto};
use std::ops::RangeInclusive;

use anyhow::{anyhow, Context};
use cid::Cid;
Expand Down Expand Up @@ -27,8 +28,54 @@ use super::types::*;
use crate::destructor;
use crate::util::types::{catch_panic_response, catch_panic_response_no_default, Result};

const STACK_SIZE: usize = 64 << 20; // 64MiB

lazy_static! {
static ref ENGINES: MultiEngineContainer = MultiEngineContainer::new_env();
static ref CONCURRENCY: u32 = get_concurrency();
static ref ENGINES: MultiEngineContainer = MultiEngineContainer::with_concurrency(*CONCURRENCY);
static ref THREAD_POOL: yastl::Pool = yastl::Pool::with_config(
*CONCURRENCY as usize,
yastl::ThreadConfig::new()
.prefix("fvm")
.stack_size(STACK_SIZE)
);
}

const LOTUS_FVM_CONCURRENCY_ENV_NAME: &str = "LOTUS_FVM_CONCURRENCY";
const VALID_CONCURRENCY_RANGE: RangeInclusive<u32> = 1..=256;

fn available_parallelism() -> u32 {
std::thread::available_parallelism()
.map(usize::from)
.unwrap_or(8) as u32
}

fn get_concurrency() -> u32 {
let valosstr = match std::env::var_os(LOTUS_FVM_CONCURRENCY_ENV_NAME) {
Some(v) => v,
None => return available_parallelism(),
};
let valstr = match valosstr.to_str() {
Some(s) => s,
None => {
log::error!("{LOTUS_FVM_CONCURRENCY_ENV_NAME} has invalid value");
return available_parallelism();
}
};
let concurrency: u32 = match valstr.parse() {
Ok(v) => v,
Err(e) => {
log::error!("{LOTUS_FVM_CONCURRENCY_ENV_NAME} has invalid value: {e}");
return available_parallelism();
}
};
if !VALID_CONCURRENCY_RANGE.contains(&concurrency) {
log::error!(
"{LOTUS_FVM_CONCURRENCY_ENV_NAME} must be in the range {VALID_CONCURRENCY_RANGE:?}, not {concurrency}"
);
return available_parallelism();
}
concurrency
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -181,14 +228,32 @@ fn create_fvm_debug_machine(
)
}

fn with_new_stack<F, T>(name: &str, pool: &yastl::Pool, callback: F) -> repr_c::Box<Result<T>>
where
T: Sized + Default + Send,
F: FnOnce() -> anyhow::Result<T> + std::panic::UnwindSafe + Send,
{
let mut res = None;
pool.scoped(|scope| scope.execute(|| res = Some(catch_panic_response(name, callback))));

res.unwrap_or_else(|| {
repr_c::Box::new(Result::err(
format!("failed to schedule {name}")
.into_bytes()
.into_boxed_slice(),
))
})
}

#[ffi_export]
fn fvm_machine_execute_message(
executor: &'_ InnerFvmMachine,
message: c_slice::Ref<u8>,
chain_len: u64,
apply_kind: u64, /* 0: Explicit, _: Implicit */
) -> repr_c::Box<Result<FvmMachineExecuteResponse>> {
catch_panic_response("fvm_machine_execute_message", || {
// Execute in the thread-pool because we need a 64MiB stack.
with_new_stack("fvm_machine_execute_message", &THREAD_POOL, || {
let apply_kind = if apply_kind == 0 {
ApplyKind::Explicit
} else {
Expand Down