Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow processes to occupy more than one slot in the execution semaphore #21960

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion src/python/pants/engine/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,18 @@ class ProcessCacheScope(Enum):
PER_SESSION = "per_session"


@dataclass(frozen=True)
class ProcessConcurrencyRange:
min: int | None = None
max: int | None = None

@dataclass(frozen=True)
class ProcessConcurrencyExclusive:
pass

ProcessConcurrency = ProcessConcurrencyRange | ProcessConcurrencyExclusive


@dataclass(frozen=True)
class Process:
argv: tuple[str, ...]
Expand All @@ -66,7 +78,7 @@ class Process:
timeout_seconds: int | float
jdk_home: str | None
execution_slot_variable: str | None
concurrency_available: int
concurrency: ProcessConcurrency | None
cache_scope: ProcessCacheScope
remote_cache_speculation_delay_millis: int
attempt: int
Expand All @@ -89,6 +101,7 @@ def __init__(
jdk_home: str | None = None,
execution_slot_variable: str | None = None,
concurrency_available: int = 0,
concurrency: ProcessConcurrency | None = None,
cache_scope: ProcessCacheScope = ProcessCacheScope.SUCCESSFUL,
remote_cache_speculation_delay_millis: int = 0,
attempt: int = 0,
Expand Down Expand Up @@ -146,6 +159,7 @@ def __init__(
object.__setattr__(self, "jdk_home", jdk_home)
object.__setattr__(self, "execution_slot_variable", execution_slot_variable)
object.__setattr__(self, "concurrency_available", concurrency_available)
object.__setattr__(self, "concurrency", concurrency)
object.__setattr__(self, "cache_scope", cache_scope)
object.__setattr__(
self, "remote_cache_speculation_delay_millis", remote_cache_speculation_delay_millis
Expand Down
6 changes: 6 additions & 0 deletions src/rust/engine/process_execution/remote/src/remote_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ async fn make_execute_request() {
jdk_home: None,
execution_slot_variable: None,
concurrency_available: 0,
concurrency: None,
cache_scope: ProcessCacheScope::Always,
execution_environment: make_environment(Platform::Linux_x86_64),
remote_cache_speculation_delay: std::time::Duration::from_millis(0),
Expand Down Expand Up @@ -193,6 +194,7 @@ async fn make_execute_request_with_instance_name() {
jdk_home: None,
execution_slot_variable: None,
concurrency_available: 0,
concurrency: None,
cache_scope: ProcessCacheScope::Always,
execution_environment: ProcessExecutionEnvironment {
name: None,
Expand Down Expand Up @@ -306,6 +308,7 @@ async fn make_execute_request_with_cache_key_gen_version() {
jdk_home: None,
execution_slot_variable: None,
concurrency_available: 0,
concurrency: None,
cache_scope: ProcessCacheScope::Always,
execution_environment: make_environment(Platform::Linux_x86_64),
remote_cache_speculation_delay: std::time::Duration::from_millis(0),
Expand Down Expand Up @@ -580,6 +583,7 @@ async fn make_execute_request_with_timeout() {
jdk_home: None,
execution_slot_variable: None,
concurrency_available: 0,
concurrency: None,
cache_scope: ProcessCacheScope::Always,
execution_environment: make_environment(Platform::Linux_x86_64),
remote_cache_speculation_delay: std::time::Duration::from_millis(0),
Expand Down Expand Up @@ -680,6 +684,7 @@ async fn make_execute_request_with_append_only_caches() {
jdk_home: None,
execution_slot_variable: None,
concurrency_available: 0,
concurrency: None,
cache_scope: ProcessCacheScope::Always,
execution_environment: make_environment(Platform::Linux_x86_64),
remote_cache_speculation_delay: std::time::Duration::from_millis(0),
Expand Down Expand Up @@ -840,6 +845,7 @@ async fn make_execute_request_using_immutable_inputs() {
jdk_home: None,
execution_slot_variable: None,
concurrency_available: 0,
concurrency: None,
cache_scope: ProcessCacheScope::Always,
execution_environment: make_environment(Platform::Linux_x86_64),
remote_cache_speculation_delay: std::time::Duration::from_millis(0),
Expand Down
36 changes: 30 additions & 6 deletions src/rust/engine/process_execution/src/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tokio::sync::{Notify, Semaphore, SemaphorePermit};
use tokio::time::sleep;
use workunit_store::{in_workunit, RunningWorkunit};

use crate::{Context, FallibleProcessResultWithPlatform, Process, ProcessError};
use crate::{Context, FallibleProcessResultWithPlatform, Process, ProcessError, ProcessConcurrency};

lazy_static! {
// TODO: Runtime formatting is unstable in Rust, so we imitate it.
Expand All @@ -32,6 +32,11 @@ lazy_static! {
/// If a Process sets a non-zero `concurrency_available` value, it may be preempted (i.e. canceled
/// and restarted) with a new concurrency value for a short period after starting.
///
/// If a Process provides a `concurrency` value with different min and max values,
/// it will occupy a minimum of `min` cores and a maximum of `max` cores on the semaphore and
/// may be preempted (i.e. canceled and restarted) with a new concurrency value for a short
/// period after starting.
///
#[derive(Clone)]
pub struct CommandRunner {
inner: Arc<dyn crate::CommandRunner>,
Expand Down Expand Up @@ -72,7 +77,26 @@ impl crate::CommandRunner for CommandRunner {
workunit: &mut RunningWorkunit,
process: Process,
) -> Result<FallibleProcessResultWithPlatform, ProcessError> {
let semaphore_acquisition = self.sema.acquire(process.concurrency_available);

let total_concurrency = self.sema.state.lock().total_concurrency;
let min_concurrency = match process.concurrency {
Some(ProcessConcurrency::Range { min, .. }) => min.unwrap_or(1),
Some(ProcessConcurrency::Exclusive) => total_concurrency,
None => process.concurrency_available,
};

let max_concurrency = match process.concurrency {
Some(ProcessConcurrency::Range { max, .. }) => max.unwrap_or(min_concurrency),
Some(ProcessConcurrency::Exclusive) => total_concurrency,
None => process.concurrency_available,
};

let min_concurrency = min(max(min_concurrency, 1), total_concurrency);
let max_concurrency = min(max_concurrency, total_concurrency);

log::debug!("Acquiring semaphore for process {} with min_concurrency: {}, max_concurrency: {}", process.description, min_concurrency, max_concurrency);

let semaphore_acquisition = self.sema.acquire(min_concurrency, max_concurrency);
let permit = in_workunit!(
"acquire_command_runner_slot",
// TODO: The UI uses the presence of a blocked workunit below a parent as an indication that
Expand Down Expand Up @@ -243,7 +267,7 @@ impl AsyncSemaphore {
F: FnOnce(usize) -> B,
B: Future<Output = O>,
{
let permit = self.acquire(1).await;
let permit = self.acquire(1, 1).await;
let res = f(permit.task.id).await;
drop(permit);
res
Expand All @@ -254,8 +278,8 @@ impl AsyncSemaphore {
/// the given amount of concurrency. The amount actually acquired will be reported on the
/// returned Permit.
///
pub async fn acquire(&self, concurrency_desired: usize) -> Permit<'_> {
let permit = self.sema.acquire().await.expect("semaphore closed");
pub async fn acquire(&self, min_concurrency: usize, max_concurrency: usize,) -> Permit<'_> {
let permit = self.sema.acquire_many(min_concurrency as u32).await.expect("semaphore closed");
let task = {
let mut state = self.state.lock();
let id = state
Expand All @@ -270,7 +294,7 @@ impl AsyncSemaphore {
//
// This is because we cannot anticipate the number of inbound processes, and we never want to
// delay a process from starting.
let concurrency_desired = max(concurrency_desired, 1);
let concurrency_desired = max(max_concurrency, min_concurrency);
let concurrency_actual = min(
concurrency_desired,
state.total_concurrency / (state.tasks.len() + 1),
Expand Down
28 changes: 28 additions & 0 deletions src/rust/engine/process_execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::convert::TryFrom;
use std::fmt::{self, Debug, Display};
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;

use async_trait::async_trait;
Expand Down Expand Up @@ -485,6 +486,29 @@ pub struct ProcessExecutionEnvironment {
pub strategy: ProcessExecutionStrategy,
}

#[derive(DeepSizeOf, Debug, Clone, Hash, PartialEq, Eq, Serialize)]
pub enum ProcessConcurrency {
/// A range of acceptable cpu cores with optional bounds
Range {
// Minimum number of cores to use, defaults to 1
min: Option<usize>,
// Maximum number of cores to use, defaults to min
max: Option<usize>,
},
/// Exclusive access to all cores
Exclusive,
}

// TODO Unclear why I need this or when its invoked, compile error for CommandSpec
impl FromStr for ProcessConcurrency {
type Err = core::convert::Infallible;

fn from_str(s: &str) -> Result<Self, Self::Err> {
log::warn!("Parsing process concurrency from string: {s}");
Ok(ProcessConcurrency::Range { min: Some(1), max: Some(1) })
}
}

///
/// A process to be executed.
///
Expand Down Expand Up @@ -543,6 +567,9 @@ pub struct Process {
/// started or finished).
pub concurrency_available: usize,

/// The number of cores required for this process to run.
pub concurrency: Option<ProcessConcurrency>,

#[derivative(PartialEq = "ignore", Hash = "ignore")]
pub description: String,

Expand Down Expand Up @@ -616,6 +643,7 @@ impl Process {
jdk_home: None,
execution_slot_variable: None,
concurrency_available: 0,
concurrency: None,
cache_scope: ProcessCacheScope::Successful,
execution_environment: ProcessExecutionEnvironment {
name: None,
Expand Down
7 changes: 6 additions & 1 deletion src/rust/engine/process_executor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use fs::{DirectoryDigest, Permissions, RelativePath};
use hashing::{Digest, Fingerprint};
use process_execution::{
local::KeepSandboxes, CacheContentBehavior, Context, InputDigests, NamedCaches, Platform,
ProcessCacheScope, ProcessExecutionEnvironment, ProcessExecutionStrategy,
ProcessCacheScope, ProcessConcurrency, ProcessExecutionEnvironment, ProcessExecutionStrategy,
};
use prost::Message;
use protos::gen::build::bazel::remote::execution::v2::{Action, Command};
Expand Down Expand Up @@ -75,6 +75,9 @@ struct CommandSpec {
#[structopt(long)]
concurrency_available: Option<usize>,

#[structopt(long)]
concurrency: Option<ProcessConcurrency>,

#[structopt(long)]
cache_key_gen_version: Option<String>,
}
Expand Down Expand Up @@ -547,6 +550,7 @@ async fn make_request_from_flat_args(
jdk_home: args.command.jdk.clone(),
execution_slot_variable: None,
concurrency_available: args.command.concurrency_available.unwrap_or(0),
concurrency: args.command.concurrency.clone(),
cache_scope: ProcessCacheScope::Always,
execution_environment,
remote_cache_speculation_delay: Duration::from_millis(0),
Expand Down Expand Up @@ -633,6 +637,7 @@ async fn extract_request_from_action_digest(
}),
execution_slot_variable: None,
concurrency_available: 0,
concurrency: None,
description: "".to_string(),
level: Level::Error,
append_only_caches: BTreeMap::new(),
Expand Down
26 changes: 24 additions & 2 deletions src/rust/engine/src/nodes/execute_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ use deepsize::DeepSizeOf;
use fs::RelativePath;
use graph::CompoundNode;
use process_execution::{
self, CacheName, InputDigests, Process, ProcessCacheScope, ProcessExecutionStrategy,
ProcessResultSource,
self, CacheName, InputDigests, Process, ProcessCacheScope, ProcessConcurrency, ProcessExecutionStrategy, ProcessResultSource
};
use pyo3::prelude::{PyAny, Python};
use pyo3::pybacked::PyBackedStr;
use pyo3::types::{PyAnyMethods, PyStringMethods, PyTypeMethods};
use pyo3::Bound;
use store::{self, Store, StoreError};
use workunit_store::{
Expand Down Expand Up @@ -124,6 +124,27 @@ impl ExecuteProcess {
.map_err(|e| format!("Failed to get `execution_slot_variable` for field: {e}"))?;

let concurrency_available: usize = externs::getattr(value, "concurrency_available")?;

let concurrency_value: Option<Bound<'_, PyAny>> = externs::getattr(value, "concurrency")?;
let concurrency: Option<ProcessConcurrency> = match concurrency_value {
None => Ok(None),
Some(conc) => {
let py_type = conc.get_type();
let py_name = py_type.name().unwrap();
let type_name = py_name.to_str().unwrap();
match type_name {
"ProcessConcurrencyRange" => {
let min: Option<usize> = externs::getattr(&conc, "min")?;
let max: Option<usize> = externs::getattr(&conc, "max")?;
Ok(Some(ProcessConcurrency::Range { min, max }))
}
"ProcessConcurrencyExclusive" => Ok(Some(ProcessConcurrency::Exclusive)),
_ => Err(format!("Unknown ProcessConcurrency type: {}", type_name)),
}
}
}?;

log::debug!("Parsed concurrency: {:?}", concurrency);

let cache_scope: ProcessCacheScope = {
let cache_scope_enum: Bound<'_, PyAny> = externs::getattr(value, "cache_scope")?;
Expand Down Expand Up @@ -151,6 +172,7 @@ impl ExecuteProcess {
jdk_home,
execution_slot_variable,
concurrency_available,
concurrency,
cache_scope,
execution_environment: process_config.environment,
remote_cache_speculation_delay,
Expand Down
Loading