diff --git a/src/python/pants/engine/process.py b/src/python/pants/engine/process.py index 9990a20b659..6c6031cebc7 100644 --- a/src/python/pants/engine/process.py +++ b/src/python/pants/engine/process.py @@ -50,6 +50,18 @@ class ProcessCacheScope(Enum): PER_SESSION = "per_session" +@dataclass(frozen=True) +class ProcessConcurrencyRange: + min: int | None = None + max: int | None = None + +@dataclass(frozen=True) +class ProcessConcurrencyExclusive: + pass + +ProcessConcurrency = ProcessConcurrencyRange | ProcessConcurrencyExclusive + + @dataclass(frozen=True) class Process: argv: tuple[str, ...] @@ -66,7 +78,7 @@ class Process: timeout_seconds: int | float jdk_home: str | None execution_slot_variable: str | None - concurrency_available: int + concurrency: ProcessConcurrency | None cache_scope: ProcessCacheScope remote_cache_speculation_delay_millis: int attempt: int @@ -89,6 +101,7 @@ def __init__( jdk_home: str | None = None, execution_slot_variable: str | None = None, concurrency_available: int = 0, + concurrency: ProcessConcurrency | None = None, cache_scope: ProcessCacheScope = ProcessCacheScope.SUCCESSFUL, remote_cache_speculation_delay_millis: int = 0, attempt: int = 0, @@ -146,6 +159,7 @@ def __init__( object.__setattr__(self, "jdk_home", jdk_home) object.__setattr__(self, "execution_slot_variable", execution_slot_variable) object.__setattr__(self, "concurrency_available", concurrency_available) + object.__setattr__(self, "concurrency", concurrency) object.__setattr__(self, "cache_scope", cache_scope) object.__setattr__( self, "remote_cache_speculation_delay_millis", remote_cache_speculation_delay_millis diff --git a/src/rust/engine/process_execution/remote/src/remote_tests.rs b/src/rust/engine/process_execution/remote/src/remote_tests.rs index 916028b0a61..bb24417d799 100644 --- a/src/rust/engine/process_execution/remote/src/remote_tests.rs +++ b/src/rust/engine/process_execution/remote/src/remote_tests.rs @@ -100,6 +100,7 @@ async fn make_execute_request() { jdk_home: None, execution_slot_variable: None, concurrency_available: 0, + concurrency: None, cache_scope: ProcessCacheScope::Always, execution_environment: make_environment(Platform::Linux_x86_64), remote_cache_speculation_delay: std::time::Duration::from_millis(0), @@ -193,6 +194,7 @@ async fn make_execute_request_with_instance_name() { jdk_home: None, execution_slot_variable: None, concurrency_available: 0, + concurrency: None, cache_scope: ProcessCacheScope::Always, execution_environment: ProcessExecutionEnvironment { name: None, @@ -306,6 +308,7 @@ async fn make_execute_request_with_cache_key_gen_version() { jdk_home: None, execution_slot_variable: None, concurrency_available: 0, + concurrency: None, cache_scope: ProcessCacheScope::Always, execution_environment: make_environment(Platform::Linux_x86_64), remote_cache_speculation_delay: std::time::Duration::from_millis(0), @@ -580,6 +583,7 @@ async fn make_execute_request_with_timeout() { jdk_home: None, execution_slot_variable: None, concurrency_available: 0, + concurrency: None, cache_scope: ProcessCacheScope::Always, execution_environment: make_environment(Platform::Linux_x86_64), remote_cache_speculation_delay: std::time::Duration::from_millis(0), @@ -680,6 +684,7 @@ async fn make_execute_request_with_append_only_caches() { jdk_home: None, execution_slot_variable: None, concurrency_available: 0, + concurrency: None, cache_scope: ProcessCacheScope::Always, execution_environment: make_environment(Platform::Linux_x86_64), remote_cache_speculation_delay: std::time::Duration::from_millis(0), @@ -840,6 +845,7 @@ async fn make_execute_request_using_immutable_inputs() { jdk_home: None, execution_slot_variable: None, concurrency_available: 0, + concurrency: None, cache_scope: ProcessCacheScope::Always, execution_environment: make_environment(Platform::Linux_x86_64), remote_cache_speculation_delay: std::time::Duration::from_millis(0), diff --git a/src/rust/engine/process_execution/src/bounded.rs b/src/rust/engine/process_execution/src/bounded.rs index 5be92ff3216..1a8a8b73c4d 100644 --- a/src/rust/engine/process_execution/src/bounded.rs +++ b/src/rust/engine/process_execution/src/bounded.rs @@ -18,7 +18,7 @@ use tokio::sync::{Notify, Semaphore, SemaphorePermit}; use tokio::time::sleep; use workunit_store::{in_workunit, RunningWorkunit}; -use crate::{Context, FallibleProcessResultWithPlatform, Process, ProcessError}; +use crate::{Context, FallibleProcessResultWithPlatform, Process, ProcessError, ProcessConcurrency}; lazy_static! { // TODO: Runtime formatting is unstable in Rust, so we imitate it. @@ -32,6 +32,11 @@ lazy_static! { /// If a Process sets a non-zero `concurrency_available` value, it may be preempted (i.e. canceled /// and restarted) with a new concurrency value for a short period after starting. /// +/// If a Process provides a `concurrency` value with different min and max values, +/// it will occupy a minimum of `min` cores and a maximum of `max` cores on the semaphore and +/// may be preempted (i.e. canceled and restarted) with a new concurrency value for a short +/// period after starting. +/// #[derive(Clone)] pub struct CommandRunner { inner: Arc, @@ -72,7 +77,26 @@ impl crate::CommandRunner for CommandRunner { workunit: &mut RunningWorkunit, process: Process, ) -> Result { - let semaphore_acquisition = self.sema.acquire(process.concurrency_available); + + let total_concurrency = self.sema.state.lock().total_concurrency; + let min_concurrency = match process.concurrency { + Some(ProcessConcurrency::Range { min, .. }) => min.unwrap_or(1), + Some(ProcessConcurrency::Exclusive) => total_concurrency, + None => process.concurrency_available, + }; + + let max_concurrency = match process.concurrency { + Some(ProcessConcurrency::Range { max, .. }) => max.unwrap_or(min_concurrency), + Some(ProcessConcurrency::Exclusive) => total_concurrency, + None => process.concurrency_available, + }; + + let min_concurrency = min(max(min_concurrency, 1), total_concurrency); + let max_concurrency = min(max_concurrency, total_concurrency); + + log::debug!("Acquiring semaphore for process {} with min_concurrency: {}, max_concurrency: {}", process.description, min_concurrency, max_concurrency); + + let semaphore_acquisition = self.sema.acquire(min_concurrency, max_concurrency); let permit = in_workunit!( "acquire_command_runner_slot", // TODO: The UI uses the presence of a blocked workunit below a parent as an indication that @@ -243,7 +267,7 @@ impl AsyncSemaphore { F: FnOnce(usize) -> B, B: Future, { - let permit = self.acquire(1).await; + let permit = self.acquire(1, 1).await; let res = f(permit.task.id).await; drop(permit); res @@ -254,8 +278,8 @@ impl AsyncSemaphore { /// the given amount of concurrency. The amount actually acquired will be reported on the /// returned Permit. /// - pub async fn acquire(&self, concurrency_desired: usize) -> Permit<'_> { - let permit = self.sema.acquire().await.expect("semaphore closed"); + pub async fn acquire(&self, min_concurrency: usize, max_concurrency: usize,) -> Permit<'_> { + let permit = self.sema.acquire_many(min_concurrency as u32).await.expect("semaphore closed"); let task = { let mut state = self.state.lock(); let id = state @@ -270,7 +294,7 @@ impl AsyncSemaphore { // // This is because we cannot anticipate the number of inbound processes, and we never want to // delay a process from starting. - let concurrency_desired = max(concurrency_desired, 1); + let concurrency_desired = max(max_concurrency, min_concurrency); let concurrency_actual = min( concurrency_desired, state.total_concurrency / (state.tasks.len() + 1), diff --git a/src/rust/engine/process_execution/src/lib.rs b/src/rust/engine/process_execution/src/lib.rs index 19bee39114c..26d5d376104 100644 --- a/src/rust/engine/process_execution/src/lib.rs +++ b/src/rust/engine/process_execution/src/lib.rs @@ -8,6 +8,7 @@ use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::convert::TryFrom; use std::fmt::{self, Debug, Display}; use std::path::PathBuf; +use std::str::FromStr; use std::sync::Arc; use async_trait::async_trait; @@ -485,6 +486,29 @@ pub struct ProcessExecutionEnvironment { pub strategy: ProcessExecutionStrategy, } +#[derive(DeepSizeOf, Debug, Clone, Hash, PartialEq, Eq, Serialize)] +pub enum ProcessConcurrency { + /// A range of acceptable cpu cores with optional bounds + Range { + // Minimum number of cores to use, defaults to 1 + min: Option, + // Maximum number of cores to use, defaults to min + max: Option, + }, + /// Exclusive access to all cores + Exclusive, +} + +// TODO Unclear why I need this or when its invoked, compile error for CommandSpec +impl FromStr for ProcessConcurrency { + type Err = core::convert::Infallible; + + fn from_str(s: &str) -> Result { + log::warn!("Parsing process concurrency from string: {s}"); + Ok(ProcessConcurrency::Range { min: Some(1), max: Some(1) }) + } +} + /// /// A process to be executed. /// @@ -543,6 +567,9 @@ pub struct Process { /// started or finished). pub concurrency_available: usize, + /// The number of cores required for this process to run. + pub concurrency: Option, + #[derivative(PartialEq = "ignore", Hash = "ignore")] pub description: String, @@ -616,6 +643,7 @@ impl Process { jdk_home: None, execution_slot_variable: None, concurrency_available: 0, + concurrency: None, cache_scope: ProcessCacheScope::Successful, execution_environment: ProcessExecutionEnvironment { name: None, diff --git a/src/rust/engine/process_executor/src/main.rs b/src/rust/engine/process_executor/src/main.rs index 148344e1e0d..2c1371736c7 100644 --- a/src/rust/engine/process_executor/src/main.rs +++ b/src/rust/engine/process_executor/src/main.rs @@ -15,7 +15,7 @@ use fs::{DirectoryDigest, Permissions, RelativePath}; use hashing::{Digest, Fingerprint}; use process_execution::{ local::KeepSandboxes, CacheContentBehavior, Context, InputDigests, NamedCaches, Platform, - ProcessCacheScope, ProcessExecutionEnvironment, ProcessExecutionStrategy, + ProcessCacheScope, ProcessConcurrency, ProcessExecutionEnvironment, ProcessExecutionStrategy, }; use prost::Message; use protos::gen::build::bazel::remote::execution::v2::{Action, Command}; @@ -75,6 +75,9 @@ struct CommandSpec { #[structopt(long)] concurrency_available: Option, + #[structopt(long)] + concurrency: Option, + #[structopt(long)] cache_key_gen_version: Option, } @@ -547,6 +550,7 @@ async fn make_request_from_flat_args( jdk_home: args.command.jdk.clone(), execution_slot_variable: None, concurrency_available: args.command.concurrency_available.unwrap_or(0), + concurrency: args.command.concurrency.clone(), cache_scope: ProcessCacheScope::Always, execution_environment, remote_cache_speculation_delay: Duration::from_millis(0), @@ -633,6 +637,7 @@ async fn extract_request_from_action_digest( }), execution_slot_variable: None, concurrency_available: 0, + concurrency: None, description: "".to_string(), level: Level::Error, append_only_caches: BTreeMap::new(), diff --git a/src/rust/engine/src/nodes/execute_process.rs b/src/rust/engine/src/nodes/execute_process.rs index b90f4f04c33..1d965011da0 100644 --- a/src/rust/engine/src/nodes/execute_process.rs +++ b/src/rust/engine/src/nodes/execute_process.rs @@ -9,11 +9,11 @@ use deepsize::DeepSizeOf; use fs::RelativePath; use graph::CompoundNode; use process_execution::{ - self, CacheName, InputDigests, Process, ProcessCacheScope, ProcessExecutionStrategy, - ProcessResultSource, + self, CacheName, InputDigests, Process, ProcessCacheScope, ProcessConcurrency, ProcessExecutionStrategy, ProcessResultSource }; use pyo3::prelude::{PyAny, Python}; use pyo3::pybacked::PyBackedStr; +use pyo3::types::{PyAnyMethods, PyStringMethods, PyTypeMethods}; use pyo3::Bound; use store::{self, Store, StoreError}; use workunit_store::{ @@ -124,6 +124,27 @@ impl ExecuteProcess { .map_err(|e| format!("Failed to get `execution_slot_variable` for field: {e}"))?; let concurrency_available: usize = externs::getattr(value, "concurrency_available")?; + + let concurrency_value: Option> = externs::getattr(value, "concurrency")?; + let concurrency: Option = match concurrency_value { + None => Ok(None), + Some(conc) => { + let py_type = conc.get_type(); + let py_name = py_type.name().unwrap(); + let type_name = py_name.to_str().unwrap(); + match type_name { + "ProcessConcurrencyRange" => { + let min: Option = externs::getattr(&conc, "min")?; + let max: Option = externs::getattr(&conc, "max")?; + Ok(Some(ProcessConcurrency::Range { min, max })) + } + "ProcessConcurrencyExclusive" => Ok(Some(ProcessConcurrency::Exclusive)), + _ => Err(format!("Unknown ProcessConcurrency type: {}", type_name)), + } + } + }?; + + log::debug!("Parsed concurrency: {:?}", concurrency); let cache_scope: ProcessCacheScope = { let cache_scope_enum: Bound<'_, PyAny> = externs::getattr(value, "cache_scope")?; @@ -151,6 +172,7 @@ impl ExecuteProcess { jdk_home, execution_slot_variable, concurrency_available, + concurrency, cache_scope, execution_environment: process_config.environment, remote_cache_speculation_delay,