diff --git a/Cargo.lock b/Cargo.lock index 8fb8f14..3305f09 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -185,6 +185,24 @@ dependencies = [ "stable_deref_trait", ] +[[package]] +name = "async-io" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "456b8a8feb6f42d237746d4b3e9a178494627745c3c56c6ea55d92ba50d026fc" +dependencies = [ + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-io", + "futures-lite", + "parking", + "polling", + "rustix", + "slab", + "windows-sys 0.61.2", +] + [[package]] name = "async-openai" version = "0.33.1" @@ -679,6 +697,15 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "console" version = "0.15.11" @@ -1446,6 +1473,16 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" +[[package]] +name = "futures-lite" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f78e10609fe0e0b3f4157ffab1876319b5b0db102a2c60dc4626306dc46b44ad" +dependencies = [ + "futures-core", + "pin-project-lite", +] + [[package]] name = "futures-macro" version = "0.3.32" @@ -2850,6 +2887,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.5" @@ -3050,6 +3093,20 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "polling" +version = "3.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d0e4f59085d47d8241c88ead0f274e8a0cb551f3625263c05eb8dd897c34218" +dependencies = [ + "cfg-if", + "concurrent-queue", + "hermit-abi", + "pin-project-lite", + "rustix", + "windows-sys 0.61.2", +] + [[package]] name = "portable-atomic" version = "1.13.1" @@ -5454,7 +5511,6 @@ dependencies = [ "educe", "expect-test", "itertools 0.14.0", - "libc", "native-tls", "serde", "serde_json", @@ -5467,6 +5523,7 @@ dependencies = [ "tracing-subscriber", "uuid", "vllm-engine-core-client", + "vllm-managed-engine", "vllm-server", ] @@ -5530,6 +5587,18 @@ dependencies = [ "zeromq", ] +[[package]] +name = "vllm-managed-engine" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "expect-test", + "libc", + "tokio", + "tracing", +] + [[package]] name = "vllm-metrics" version = "0.1.0" @@ -5817,6 +5886,17 @@ version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a28ac98ddc8b9274cb41bb4d9d4d5c425b6020c50c46f25559911905610b4a88" +[[package]] +name = "win_uds" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dd30a1a28a3799479cbf4e17284a220ea9ff6bad098a9d0224543a5d1efe1da" +dependencies = [ + "async-io", + "futures-io", + "socket2", +] + [[package]] name = "winapi" version = "0.3.9" @@ -6270,8 +6350,9 @@ checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0" [[package]] name = "zeromq" -version = "0.6.0-pre.1" -source = "git+https://github.com/BugenZhao/zmq.rs?rev=dde4ee5c48846b0cc22b1f4fad8c7514748180fe#dde4ee5c48846b0cc22b1f4fad8c7514748180fe" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "efb2c254fd8f366755335c9e43b865f8484fe3bd717d65ffe7c3f28852863030" dependencies = [ "async-trait", "asynchronous-codec", @@ -6289,6 +6370,7 @@ dependencies = [ "tokio", "tokio-util", "uuid", + "win_uds", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 5faf22e..fddb52d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "src/cmd", "src/engine-core-client", "src/llm", + "src/managed-engine", "src/metrics", "src/server", "src/text", @@ -95,14 +96,15 @@ validator = { version = "0.20.0", features = ["derive"] } vllm-chat = { path = "src/chat" } vllm-engine-core-client = { path = "src/engine-core-client" } vllm-llm = { path = "src/llm" } +vllm-managed-engine = { path = "src/managed-engine" } vllm-metrics = { path = "src/metrics" } vllm-server = { path = "src/server" } vllm-text = { path = "src/text" } winnow = "1.0.2" -zeromq = { git = "https://github.com/BugenZhao/zmq.rs", rev = "dde4ee5c48846b0cc22b1f4fad8c7514748180fe", default-features = false, features = [ +zeromq = { version = "0.6.0", default-features = false, features = [ "tokio-runtime", "all-transport", -] } # TODO: switch to crates.io version after the patch gets released +] } [workspace.lints.clippy] too_many_arguments = "allow" diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index f2c2ca8..5f1e3ee 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -16,19 +16,19 @@ anyhow.workspace = true clap.workspace = true educe.workspace = true itertools.workspace = true -libc.workspace = true native-tls-vendored = { workspace = true, optional = true } serde.workspace = true serde_json.workspace = true serde_with.workspace = true thiserror-ext.workspace = true time.workspace = true -tokio = { workspace = true, features = ["process", "signal"] } +tokio = { workspace = true, features = ["signal"] } tokio-util.workspace = true tracing.workspace = true tracing-subscriber.workspace = true uuid.workspace = true vllm-engine-core-client.workspace = true +vllm-managed-engine.workspace = true vllm-server.workspace = true [dev-dependencies] diff --git a/src/cmd/src/cli.rs b/src/cmd/src/cli.rs index 0458425..05227a2 100644 --- a/src/cmd/src/cli.rs +++ b/src/cmd/src/cli.rs @@ -4,7 +4,6 @@ //! - Engine args: //! - Environment variables: -mod serve_validate; mod unsupported; use std::collections::HashMap; @@ -20,13 +19,14 @@ use serde_json::Value; use thiserror_ext::AsReport as _; use uuid::Uuid; use vllm_engine_core_client::TransportMode; +use vllm_managed_engine::ManagedEngineConfig; +use vllm_managed_engine::cli::{ManagedEngineArgs, repartition_managed_engine_args}; use vllm_server::{ ChatTemplateContentFormatOption, Config, CoordinatorMode, HttpListenerMode, ParserSelection, RendererSelection, }; use crate::cli::unsupported::UnsupportedArgs; -use crate::managed_engine::ManagedEngineConfig; /// Top-level parser for the `vllm-rs` binary. #[derive(Debug, Parser)] @@ -50,7 +50,7 @@ impl Cli { T: Into, { let args: Vec = itr.into_iter().map(Into::into).collect(); - let repartitioned_args = serve_validate::repartition_serve_args(&args)?; + let repartitioned_args = repartition_managed_engine_args::(&args, Some("serve"))?; ::try_parse_from(&repartitioned_args).inspect(|cli| { if let Command::Serve(serve) = &cli.command && serve.debug_cli @@ -63,7 +63,10 @@ impl Cli { "Repartitioned CLI args: {}\n", repartitioned_args.join(OsStr::new(" ")).display() ); - println!("Passthrough Python args: {}", serve.python_args.join(" ")); + println!( + "Passthrough Python args: {}", + serve.managed_engine.python_args.join(" ") + ); std::process::exit(0); } }) @@ -350,9 +353,6 @@ pub struct ServeArgs { /// frontend. #[arg(long)] pub headless: bool, - /// Python executable used to launch the managed headless vLLM engine. - #[arg(long, env = "VLLM_RS_PYTHON", default_value = "python3")] - pub python: String, /// HTTP bind host for the OpenAI-compatible server. #[arg(long, default_value = "127.0.0.1")] pub host: String, @@ -362,30 +362,6 @@ pub struct ServeArgs { /// Unix domain socket path. If set, host and port arguments are ignored. #[arg(long)] pub uds: Option, - /// Host/IP used both for the managed-engine handshake endpoint and the - /// frontend-advertised input/output ZMQ socket addresses. - #[arg( - long = "data-parallel-address", - visible_alias = "handshake-host", - default_value = "127.0.0.1" - )] - pub handshake_host: String, - /// Optional TCP port for the managed-engine handshake / data-parallel RPC - /// endpoint. - /// - /// When omitted, the CLI allocates an ephemeral port automatically. - #[arg( - long = "data-parallel-rpc-port", - visible_alias = "handshake-port", - value_parser = clap::value_parser!(u16).range(1..) - )] - pub handshake_port: Option, - /// Number of data parallel replicas across the whole deployment. - #[arg(long, default_value_t = 1)] - pub data_parallel_size: usize, - /// Number of data parallel replicas to run on this node. - #[arg(long)] - pub data_parallel_size_local: Option, /// Flag to print debug information about CLI argument parsing and exit. #[educe(Debug(ignore))] @@ -396,33 +372,18 @@ pub struct ServeArgs { #[command(flatten)] pub runtime: SharedRuntimeArgs, - /// Additional arguments forwarded to `python -m vllm.entrypoints.cli.main - /// serve ...`. - /// - /// Arguments after an explicit `--` are forwarded verbatim. Before `--`, - /// `vllm-rs serve` automatically keeps recognized frontend options on - /// the Rust side and forwards everything else to Python. - #[arg( - last = true, - allow_hyphen_values = true, - help_heading = "Passthrough arguments" - )] - pub python_args: Vec, + /// Managed Python headless-engine arguments. + #[command(flatten)] + pub managed_engine: ManagedEngineArgs, } impl ServeArgs { - /// Build the handshake address shared by the Rust frontend and managed - /// Python engine. - pub fn handshake_address(&self, handshake_port: u16) -> String { - format!("tcp://{}:{}", self.handshake_host, handshake_port) - } - /// Build the OpenAI-server runtime config used after the managed Python /// engine starts. pub fn to_frontend_config(&self, handshake_address: String) -> Config { // Prefer IPC sockets for local engine input/output. let (local_input_address, local_output_address) = - self.frontend_local_only().then(frontend_ipc_addresses).unzip(); + self.managed_engine.frontend_local_only().then(frontend_ipc_addresses).unzip(); let listener_mode = match &self.uds { Some(path) => HttpListenerMode::BindUnix { path: path.clone() }, None => HttpListenerMode::BindTcp { @@ -434,46 +395,21 @@ impl ServeArgs { self.runtime.clone().into_managed_config( listener_mode, handshake_address, - self.handshake_host.clone(), - self.data_parallel_size, + self.managed_engine.handshake_host.clone(), + self.managed_engine.data_parallel_size, local_input_address, local_output_address, ) } - /// Build the managed Python-engine spawn configuration for one resolved + /// Build the managed Python-engine spawn configuration with the given /// handshake port. - pub fn into_managed_engine_config(self, handshake_port: u16) -> ManagedEngineConfig { - let mut python_args = self.python_args; - // Manually forward some args to the Python engine. - if let Some(max_model_len) = self.runtime.max_model_len { - python_args.push("--max-model-len".to_string()); - python_args.push(max_model_len.to_string()); - } - if let Some(data_parallel_size_local) = self.data_parallel_size_local { - python_args.push("--data-parallel-size-local".to_string()); - python_args.push(data_parallel_size_local.to_string()); - } - - ManagedEngineConfig { - python: self.python, - model: self.runtime.model, - handshake_host: self.handshake_host, + pub fn to_managed_engine_config(&self, handshake_port: u16) -> ManagedEngineConfig { + self.managed_engine.clone().into_config( + self.runtime.model.clone(), + self.runtime.max_model_len, handshake_port, - data_parallel_size: self.data_parallel_size, - python_args, - } - } - - fn local_engine_count(&self) -> usize { - self.data_parallel_size_local.unwrap_or(self.data_parallel_size) - } - - /// Return whether the managed Rust frontend only needs to communicate with - /// colocated engines. - fn frontend_local_only(&self) -> bool { - self.data_parallel_size_local != Some(0) - && self.local_engine_count() == self.data_parallel_size + ) } } diff --git a/src/cmd/src/cli/serve_validate.rs b/src/cmd/src/cli/serve_validate.rs deleted file mode 100644 index aa14694..0000000 --- a/src/cmd/src/cli/serve_validate.rs +++ /dev/null @@ -1,215 +0,0 @@ -use std::collections::HashSet; -use std::ffi::OsString; - -use clap::CommandFactory as _; -use clap::error::ErrorKind; - -use crate::cli::Cli; - -/// Python `argparse` accepts these multi-character single-dash aliases, but -/// `clap` cannot model them directly. -const PYTHON_MULTI_CHAR_ALIASES: &[(&str, &str)] = &[ - ("-asc", "--api-server-count"), - ("-pp", "--pipeline-parallel-size"), - ("-tp", "--tensor-parallel-size"), - ("-dcp", "--decode-context-parallel-size"), - ("-pcp", "--prefill-context-parallel-size"), - ("-dp", "--data-parallel-size"), - ("-dpn", "--data-parallel-rank"), - ("-dpr", "--data-parallel-start-rank"), - ("-dpl", "--data-parallel-size-local"), - ("-dpa", "--data-parallel-address"), - ("-dpp", "--data-parallel-rpc-port"), - ("-dpb", "--data-parallel-backend"), - ("-dph", "--data-parallel-hybrid-lb"), - ("-dpe", "--data-parallel-external-lb"), - ("-ep", "--enable-expert-parallel"), - ("-cc", "--compilation-config"), - ("-ac", "--attention-config"), -]; - -/// Repartition `serve` argv so Rust frontend-owned flags stay before `--`, -/// while everything else is forwarded to Python. -pub(super) fn repartition_serve_args(args: &[OsString]) -> Result, clap::Error> { - if args.get(1).map(|arg| arg.as_os_str()) != Some("serve".as_ref()) { - return Ok(args.to_vec()); - } - - if args.get(2).is_none() { - return Ok(args.to_vec()); - } - - let model = args[2].to_string_lossy(); - if is_help_flag(&model) { - return Ok(args.to_vec()); - } - if model == "--" || is_option_like(&model) { - return Err(build_missing_model_error()); - } - - let (front_args, explicit_passthrough, had_separator) = split_serve_args(&args[3..]); - let normalized_front_args = normalize_python_arg_aliases(front_args); - let (long_flags, short_flags) = collect_frontend_option_names(); - - let mut frontend_chunks = Vec::new(); - let mut python_chunks = Vec::new(); - let mut current_chunk = Vec::new(); - - for arg in normalized_front_args { - let text = arg.to_string_lossy(); - if is_option_like(&text) && !current_chunk.is_empty() { - push_chunk( - &mut frontend_chunks, - &mut python_chunks, - std::mem::take(&mut current_chunk), - &long_flags, - &short_flags, - ); - } - current_chunk.push(arg); - } - if !current_chunk.is_empty() { - push_chunk( - &mut frontend_chunks, - &mut python_chunks, - current_chunk, - &long_flags, - &short_flags, - ); - } - - let mut repartitioned = vec![args[0].clone(), args[1].clone(), args[2].clone()]; - repartitioned.extend(frontend_chunks); - if had_separator || !python_chunks.is_empty() || !explicit_passthrough.is_empty() { - repartitioned.push("--".into()); - repartitioned.extend(python_chunks); - repartitioned.extend(explicit_passthrough.iter().cloned()); - } - - Ok(repartitioned) -} - -fn split_serve_args(args: &[OsString]) -> (&[OsString], &[OsString], bool) { - if let Some(index) = args.iter().position(|arg| arg == "--") { - (&args[..index], &args[index + 1..], true) - } else { - (args, &[], false) - } -} - -fn normalize_python_arg_aliases(args: &[OsString]) -> Vec { - args.iter() - .map(|arg| { - let text = arg.to_string_lossy(); - normalize_python_multi_char_alias(&text) - .map(Into::into) - .unwrap_or_else(|| arg.clone()) - }) - .collect() -} - -fn normalize_python_multi_char_alias(arg: &str) -> Option { - find_python_multi_char_alias(arg).map(|canonical| match arg.split_once('=') { - Some((_, value)) => format!("{canonical}={value}"), - None => canonical.to_string(), - }) -} - -fn find_python_multi_char_alias(arg: &str) -> Option<&'static str> { - PYTHON_MULTI_CHAR_ALIASES.iter().find_map(|&(alias, canonical)| { - (arg == alias || arg.starts_with(&format!("{alias}="))).then_some(canonical) - }) -} - -fn push_chunk( - frontend_chunks: &mut Vec, - python_chunks: &mut Vec, - chunk: Vec, - long_flags: &HashSet, - short_flags: &HashSet, -) { - if chunk_head_is_frontend_owned(&chunk, long_flags, short_flags) { - frontend_chunks.extend(chunk); - } else { - python_chunks.extend(chunk); - } -} - -fn chunk_head_is_frontend_owned( - chunk: &[OsString], - long_flags: &HashSet, - short_flags: &HashSet, -) -> bool { - let Some(head) = chunk.first() else { - return false; - }; - let head = head.to_string_lossy(); - - if let Some(rest) = head.strip_prefix("--") { - let name = rest.split_once('=').map_or(rest, |(name, _)| name); - return long_flags.contains(name); - } - - let Some(rest) = head.strip_prefix('-') else { - return false; - }; - let Some(short) = rest.chars().next() else { - return false; - }; - short_flags.contains(&short) -} - -fn collect_frontend_option_names() -> (HashSet, HashSet) { - let mut command = Cli::command(); - let serve_command = - command.find_subcommand_mut("serve").expect("serve subcommand should exist"); - - let mut long_flags = HashSet::new(); - let mut short_flags = HashSet::new(); - for arg in serve_command.get_arguments() { - if let Some(names) = arg.get_long_and_visible_aliases() { - long_flags.extend(names.into_iter().map(str::to_owned)); - } - if let Some(short) = arg.get_short() { - short_flags.insert(short); - } - if let Some(short_aliases) = arg.get_visible_short_aliases() { - short_flags.extend(short_aliases); - } - } - - long_flags.insert("help".to_string()); - short_flags.insert('h'); - - (long_flags, short_flags) -} - -fn is_option_like(arg: &str) -> bool { - if arg == "--" { - return false; - } - - if let Some(rest) = arg.strip_prefix("--") { - return rest.chars().next().is_some_and(char::is_alphabetic); - } - - if let Some(rest) = arg.strip_prefix('-') { - return rest.chars().next().is_some_and(char::is_alphabetic); - } - - false -} - -fn is_help_flag(arg: &str) -> bool { - arg == "-h" || arg == "--help" -} - -fn build_missing_model_error() -> clap::Error { - let mut command = Cli::command(); - let serve_command = - command.find_subcommand_mut("serve").expect("serve subcommand should exist"); - serve_command.error( - ErrorKind::MissingRequiredArgument, - "serve requires the model to appear immediately after the subcommand", - ) -} diff --git a/src/cmd/src/cli/tests.rs b/src/cmd/src/cli/tests.rs index 9d9e1dc..383f379 100644 --- a/src/cmd/src/cli/tests.rs +++ b/src/cmd/src/cli/tests.rs @@ -25,14 +25,9 @@ fn serve_args_forward_python_flags_with_separator() { command: Serve( ServeArgs { headless: false, - python: "../vllm/.venv/bin/python", host: "127.0.0.1", port: 8000, uds: None, - handshake_host: "127.0.0.1", - handshake_port: None, - data_parallel_size: 1, - data_parallel_size_local: None, runtime: SharedRuntimeArgs { model: "Qwen/Qwen3-0.6B", engine_ready_timeout_secs: 600, @@ -51,10 +46,17 @@ fn serve_args_forward_python_flags_with_separator() { disable_log_stats: false, served_model_name: [], }, - python_args: [ - "--dtype", - "float16", - ], + managed_engine: ManagedEngineArgs { + python: "../vllm/.venv/bin/python", + handshake_host: "127.0.0.1", + handshake_port: None, + data_parallel_size: 1, + data_parallel_size_local: None, + python_args: [ + "--dtype", + "float16", + ], + }, }, ), } @@ -78,7 +80,7 @@ fn serve_args_auto_forward_python_flags_without_separator() { let Command::Serve(args) = cli.command else { panic!("expected serve args"); }; - assert_eq!(args.python_args, vec!["--dtype", "float16"]); + assert_eq!(args.managed_engine.python_args, vec!["--dtype", "float16"]); } #[test] @@ -88,7 +90,10 @@ fn serve_args_auto_forward_python_multi_char_alias_without_separator() { let Command::Serve(args) = cli.command else { panic!("expected serve args"); }; - assert_eq!(args.python_args, vec!["--tensor-parallel-size", "2"]); + assert_eq!( + args.managed_engine.python_args, + vec!["--tensor-parallel-size", "2"] + ); } #[test] @@ -427,7 +432,7 @@ fn serve_args_reject_flags_before_model() { .unwrap_err(); expect![[r#" - error: serve requires the model to appear immediately after the subcommand + error: the model must appear immediately after the command Usage: vllm-rs serve [OPTIONS] [-- ...] @@ -466,7 +471,7 @@ fn serve_args_keep_python_passthrough_flags_after_separator() { panic!("expected serve args"); }; assert_eq!( - args.python_args, + args.managed_engine.python_args, vec!["--tensor-parallel-size", "2", "--dtype", "float16"] ); } @@ -490,7 +495,10 @@ fn serve_args_keep_python_multi_char_alias_after_separator() { let Command::Serve(args) = cli.command else { panic!("expected serve args"); }; - assert_eq!(args.python_args, vec!["-tp", "2", "--dtype", "float16"]); + assert_eq!( + args.managed_engine.python_args, + vec!["-tp", "2", "--dtype", "float16"] + ); } #[test] @@ -508,7 +516,10 @@ fn serve_args_keep_frontend_arg_after_separator() { let Command::Serve(args) = cli.command else { panic!("expected serve args"); }; - assert_eq!(args.python_args, vec!["--uds", "/tmp/vllm.sock"]); + assert_eq!( + args.managed_engine.python_args, + vec!["--uds", "/tmp/vllm.sock"] + ); } #[test] @@ -528,7 +539,10 @@ fn serve_args_keep_python_multi_char_engine_aliases_after_separator() { let Command::Serve(args) = cli.command else { panic!("expected serve args"); }; - assert_eq!(args.python_args, vec!["-dpr", "1", "-dpl", "2"]); + assert_eq!( + args.managed_engine.python_args, + vec!["-dpr", "1", "-dpl", "2"] + ); } #[test] @@ -538,7 +552,7 @@ fn serve_args_auto_forward_unknown_flags_without_separator() { let Command::Serve(args) = cli.command else { panic!("expected serve args"); }; - assert_eq!(args.python_args, vec!["--foo", "bar"]); + assert_eq!(args.managed_engine.python_args, vec!["--foo", "bar"]); } #[test] @@ -549,7 +563,7 @@ fn serve_args_auto_forward_negative_value_without_separator() { let Command::Serve(args) = cli.command else { panic!("expected serve args"); }; - assert_eq!(args.python_args, vec!["--dtype", "-1"]); + assert_eq!(args.managed_engine.python_args, vec!["--dtype", "-1"]); } #[test] @@ -574,16 +588,9 @@ fn serve_args_accept_handshake_aliases() { command: Serve( ServeArgs { headless: false, - python: "python3", host: "127.0.0.1", port: 8000, uds: None, - handshake_host: "10.99.48.128", - handshake_port: Some( - 13345, - ), - data_parallel_size: 4, - data_parallel_size_local: None, runtime: SharedRuntimeArgs { model: "Qwen/Qwen3-0.6B", engine_ready_timeout_secs: 600, @@ -600,7 +607,16 @@ fn serve_args_accept_handshake_aliases() { disable_log_stats: false, served_model_name: [], }, - python_args: [], + managed_engine: ManagedEngineArgs { + python: "python3", + handshake_host: "10.99.48.128", + handshake_port: Some( + 13345, + ), + data_parallel_size: 4, + data_parallel_size_local: None, + python_args: [], + }, }, ), } @@ -627,9 +643,9 @@ fn serve_args_accept_data_parallel_primary_flags() { panic!("expected serve args"); }; assert!(!args.headless); - assert_eq!(args.handshake_host, "10.99.48.128"); - assert_eq!(args.handshake_port, Some(13345)); - assert_eq!(args.data_parallel_size, 4); + assert_eq!(args.managed_engine.handshake_host, "10.99.48.128"); + assert_eq!(args.managed_engine.handshake_port, Some(13345)); + assert_eq!(args.managed_engine.data_parallel_size, 4); } #[test] diff --git a/src/cmd/src/main.rs b/src/cmd/src/main.rs index 1a744ee..ce4e37e 100644 --- a/src/cmd/src/main.rs +++ b/src/cmd/src/main.rs @@ -1,6 +1,5 @@ mod cli; mod logging; -mod managed_engine; use std::env; use std::process::ExitStatus; @@ -8,9 +7,9 @@ use std::process::ExitStatus; use anyhow::{Context, Result, anyhow, bail}; use tokio_util::sync::CancellationToken; use tracing::{info, warn}; +use vllm_managed_engine::ManagedEngineHandle; use crate::cli::{Cli, Command}; -use crate::managed_engine::{ManagedEngineHandle, allocate_handshake_port}; const TOKIO_WORKER_THREADS_ENV: &str = "TOKIO_WORKER_THREADS"; const DEFAULT_MAX_TOKIO_WORKER_THREADS: usize = 32; @@ -96,20 +95,17 @@ async fn async_main(cli: Cli) -> Result<()> { match cli.command { Command::Frontend(args) => vllm_server::serve(args.into_config(), shutdown_signal()).await, Command::Serve(args) => { - let handshake_port = match args.handshake_port { - Some(port) => port, - None => allocate_handshake_port(&args.handshake_host)?, - }; + let handshake_port = args.managed_engine.resolve_handshake_port()?; - if args.data_parallel_size_local == Some(0) { + if args.managed_engine.data_parallel_size_local == Some(0) { if args.headless { bail!("cannot combine `--headless` with `--data-parallel-size-local 0`"); } - let handshake_address = args.handshake_address(handshake_port); + let handshake_address = args.managed_engine.handshake_address(handshake_port); info!( %handshake_address, - engine_count = args.data_parallel_size, + engine_count = args.managed_engine.data_parallel_size, "running Rust frontend without a managed local Python engine" ); let config = args.to_frontend_config(handshake_address); @@ -117,7 +113,7 @@ async fn async_main(cli: Cli) -> Result<()> { } let shutdown_timeout = args.runtime.shutdown_timeout(); - let engine_config = args.clone().into_managed_engine_config(handshake_port); + let engine_config = args.to_managed_engine_config(handshake_port); let handshake_address = engine_config.handshake_address(); let engine = ManagedEngineHandle::spawn(engine_config) diff --git a/src/managed-engine/Cargo.toml b/src/managed-engine/Cargo.toml new file mode 100644 index 0000000..7578f7f --- /dev/null +++ b/src/managed-engine/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "vllm-managed-engine" +version.workspace = true +edition.workspace = true + +[dependencies] +anyhow.workspace = true +clap.workspace = true +libc.workspace = true +tokio = { workspace = true, features = ["process"] } +tracing.workspace = true + +[dev-dependencies] +expect-test.workspace = true + +[lints] +workspace = true diff --git a/src/managed-engine/src/cli.rs b/src/managed-engine/src/cli.rs new file mode 100644 index 0000000..302737d --- /dev/null +++ b/src/managed-engine/src/cli.rs @@ -0,0 +1,349 @@ +use std::collections::HashSet; +use std::ffi::OsString; + +use clap::error::ErrorKind; +use clap::{Args, CommandFactory}; + +use crate::{ManagedEngineConfig, allocate_handshake_port}; + +/// Managed Python headless-engine CLI arguments. +#[derive(Debug, Clone, Args, PartialEq, Eq)] +pub struct ManagedEngineArgs { + /// Python executable used to launch the managed headless vLLM engine. + #[arg(long, env = "VLLM_RS_PYTHON", default_value = "python3")] + pub python: String, + /// Host/IP used both for the managed-engine handshake endpoint and the + /// frontend-advertised input/output ZMQ socket addresses. + #[arg( + long = "data-parallel-address", + visible_alias = "handshake-host", + default_value = "127.0.0.1" + )] + pub handshake_host: String, + /// Optional TCP port for the managed-engine handshake / data-parallel RPC + /// endpoint. + /// + /// When omitted, the CLI allocates an ephemeral port automatically. + #[arg( + long = "data-parallel-rpc-port", + visible_alias = "handshake-port", + value_parser = clap::value_parser!(u16).range(1..) + )] + pub handshake_port: Option, + /// Number of data parallel replicas across the whole deployment. + #[arg(long, default_value_t = 1)] + pub data_parallel_size: usize, + /// Number of data parallel replicas to run on this node. + #[arg(long)] + pub data_parallel_size_local: Option, + + /// Additional arguments forwarded to `python -m vllm.entrypoints.cli.main + /// serve ...`. + /// + /// Arguments after an explicit `--` are forwarded verbatim. Before `--`, + /// `vllm-rs serve` automatically keeps recognized frontend options on + /// the Rust side and forwards everything else to Python. + #[arg( + last = true, + allow_hyphen_values = true, + help_heading = "Passthrough arguments" + )] + pub python_args: Vec, +} + +impl ManagedEngineArgs { + /// Build the handshake address shared by the Rust frontend and managed + /// Python engine. + pub fn handshake_address(&self, handshake_port: u16) -> String { + format!("tcp://{}:{}", self.handshake_host, handshake_port) + } + + /// Resolve the handshake port, either from the CLI argument (if specified) + /// or by allocating a fresh port. + pub fn resolve_handshake_port(&self) -> anyhow::Result { + self.handshake_port + .map(Ok) + .unwrap_or_else(|| allocate_handshake_port(&self.handshake_host)) + } + + /// Build the managed Python-engine spawn configuration. + pub fn into_config( + self, + model: String, + max_model_len: Option, + handshake_port: u16, + ) -> ManagedEngineConfig { + let mut python_args = self.python_args; + // Manually forward some args to the Python engine. + if let Some(max_model_len) = max_model_len { + python_args.push("--max-model-len".to_string()); + python_args.push(max_model_len.to_string()); + } + if let Some(data_parallel_size_local) = self.data_parallel_size_local { + python_args.push("--data-parallel-size-local".to_string()); + python_args.push(data_parallel_size_local.to_string()); + } + + ManagedEngineConfig { + python: self.python, + model, + handshake_host: self.handshake_host, + handshake_port, + data_parallel_size: self.data_parallel_size, + python_args, + } + } + + /// Return the number of engines that the Rust frontend should expect to + /// coordinate with. + fn local_engine_count(&self) -> usize { + self.data_parallel_size_local.unwrap_or(self.data_parallel_size) + } + + /// Return whether the managed Rust frontend only needs to communicate with + /// colocated engines. + pub fn frontend_local_only(&self) -> bool { + self.data_parallel_size_local != Some(0) + && self.local_engine_count() == self.data_parallel_size + } +} + +/// Python `argparse` accepts these multi-character single-dash aliases, but +/// `clap` cannot model them directly. +const PYTHON_MULTI_CHAR_ALIASES: &[(&str, &str)] = &[ + ("-asc", "--api-server-count"), + ("-pp", "--pipeline-parallel-size"), + ("-tp", "--tensor-parallel-size"), + ("-dcp", "--decode-context-parallel-size"), + ("-pcp", "--prefill-context-parallel-size"), + ("-dp", "--data-parallel-size"), + ("-dpn", "--data-parallel-rank"), + ("-dpr", "--data-parallel-start-rank"), + ("-dpl", "--data-parallel-size-local"), + ("-dpa", "--data-parallel-address"), + ("-dpp", "--data-parallel-rpc-port"), + ("-dpb", "--data-parallel-backend"), + ("-dph", "--data-parallel-hybrid-lb"), + ("-dpe", "--data-parallel-external-lb"), + ("-ep", "--enable-expert-parallel"), + ("-cc", "--compilation-config"), + ("-ac", "--attention-config"), +]; + +/// Repartition managed-engine argv so Rust-owned flags stay before `--`, while +/// everything else is forwarded to Python. +pub fn repartition_managed_engine_args( + args: &[OsString], + subcommand: Option<&str>, +) -> Result, clap::Error> +where + C: CommandFactory, +{ + let command = C::command(); + let (prefix, real_args, command) = match subcommand { + Some(subcommand) => { + if !matches_subcommand(args, subcommand) { + return Ok(args.to_vec()); + }; + + let subcommand = command + .find_subcommand(subcommand) + .expect("managed-engine subcommand should exist"); + + (args[..2].to_vec(), &args[2..], subcommand) + } + None => { + let Some(program) = args.first() else { + return Ok(args.to_vec()); + }; + + (vec![program.clone()], &args[1..], &command) + } + }; + + let mut repartitioned = prefix; + repartitioned.extend(repartition_real_managed_engine_args(real_args, command)?); + Ok(repartitioned) +} + +fn repartition_real_managed_engine_args( + args: &[OsString], + command: &clap::Command, +) -> Result, clap::Error> { + let Some(model) = args.first() else { + return Ok(args.to_vec()); + }; + + let model = model.to_string_lossy(); + if is_help_flag(&model) { + return Ok(args.to_vec()); + } + if model == "--" || is_option_like(&model) { + return Err(build_missing_model_error(command)); + } + + let (long_flags, short_flags) = collect_option_names(command); + let (front_args, explicit_passthrough, had_separator) = split_managed_engine_args(&args[1..]); + let normalized_front_args = normalize_python_arg_aliases(front_args); + + let mut frontend_chunks = Vec::new(); + let mut python_chunks = Vec::new(); + let mut current_chunk = Vec::new(); + + for arg in normalized_front_args { + let text = arg.to_string_lossy(); + if is_option_like(&text) && !current_chunk.is_empty() { + push_chunk( + &mut frontend_chunks, + &mut python_chunks, + std::mem::take(&mut current_chunk), + &long_flags, + &short_flags, + ); + } + current_chunk.push(arg); + } + if !current_chunk.is_empty() { + push_chunk( + &mut frontend_chunks, + &mut python_chunks, + current_chunk, + &long_flags, + &short_flags, + ); + } + + let mut repartitioned = vec![args[0].clone()]; + repartitioned.extend(frontend_chunks); + if had_separator || !python_chunks.is_empty() || !explicit_passthrough.is_empty() { + repartitioned.push("--".into()); + repartitioned.extend(python_chunks); + repartitioned.extend(explicit_passthrough.iter().cloned()); + } + + Ok(repartitioned) +} + +fn matches_subcommand(args: &[OsString], subcommand: &str) -> bool { + args.get(1) + .and_then(|arg| arg.to_str()) + .is_some_and(|candidate| candidate == subcommand) +} + +fn split_managed_engine_args(args: &[OsString]) -> (&[OsString], &[OsString], bool) { + if let Some(index) = args.iter().position(|arg| arg == "--") { + (&args[..index], &args[index + 1..], true) + } else { + (args, &[], false) + } +} + +fn normalize_python_arg_aliases(args: &[OsString]) -> Vec { + args.iter() + .map(|arg| { + let text = arg.to_string_lossy(); + normalize_python_multi_char_alias(&text) + .map(Into::into) + .unwrap_or_else(|| arg.clone()) + }) + .collect() +} + +fn normalize_python_multi_char_alias(arg: &str) -> Option { + find_python_multi_char_alias(arg).map(|canonical| match arg.split_once('=') { + Some((_, value)) => format!("{canonical}={value}"), + None => canonical.to_string(), + }) +} + +fn find_python_multi_char_alias(arg: &str) -> Option<&'static str> { + PYTHON_MULTI_CHAR_ALIASES.iter().find_map(|&(alias, canonical)| { + (arg == alias || arg.starts_with(&format!("{alias}="))).then_some(canonical) + }) +} + +fn push_chunk( + frontend_chunks: &mut Vec, + python_chunks: &mut Vec, + chunk: Vec, + long_flags: &HashSet, + short_flags: &HashSet, +) { + if chunk_head_is_frontend_owned(&chunk, long_flags, short_flags) { + frontend_chunks.extend(chunk); + } else { + python_chunks.extend(chunk); + } +} + +fn chunk_head_is_frontend_owned( + chunk: &[OsString], + long_flags: &HashSet, + short_flags: &HashSet, +) -> bool { + let Some(head) = chunk.first() else { + return false; + }; + let head = head.to_string_lossy(); + + if let Some(rest) = head.strip_prefix("--") { + let name = rest.split_once('=').map_or(rest, |(name, _)| name); + return long_flags.contains(name); + } + + let Some(rest) = head.strip_prefix('-') else { + return false; + }; + let Some(short) = rest.chars().next() else { + return false; + }; + short_flags.contains(&short) +} + +fn collect_option_names(command: &clap::Command) -> (HashSet, HashSet) { + let mut long_flags = HashSet::new(); + let mut short_flags = HashSet::new(); + for arg in command.get_arguments() { + if let Some(names) = arg.get_long_and_visible_aliases() { + long_flags.extend(names.into_iter().map(str::to_owned)); + } + if let Some(short) = arg.get_short() { + short_flags.insert(short); + } + if let Some(short_aliases) = arg.get_visible_short_aliases() { + short_flags.extend(short_aliases); + } + } + + long_flags.insert("help".to_string()); + short_flags.insert('h'); + + (long_flags, short_flags) +} + +fn is_option_like(arg: &str) -> bool { + if arg == "--" { + return false; + } + + if let Some(rest) = arg.strip_prefix("--") { + return rest.chars().next().is_some_and(char::is_alphabetic); + } + + if let Some(rest) = arg.strip_prefix('-') { + return rest.chars().next().is_some_and(char::is_alphabetic); + } + + false +} + +fn is_help_flag(arg: &str) -> bool { + arg == "-h" || arg == "--help" +} + +fn build_missing_model_error(command: &clap::Command) -> clap::Error { + command.clone().error( + ErrorKind::MissingRequiredArgument, + "the model must appear immediately after the command", + ) +} diff --git a/src/managed-engine/src/lib.rs b/src/managed-engine/src/lib.rs new file mode 100644 index 0000000..e981210 --- /dev/null +++ b/src/managed-engine/src/lib.rs @@ -0,0 +1,4 @@ +pub mod cli; +mod process; + +pub use process::{ManagedEngineConfig, ManagedEngineHandle, allocate_handshake_port}; diff --git a/src/cmd/src/managed_engine.rs b/src/managed-engine/src/process.rs similarity index 100% rename from src/cmd/src/managed_engine.rs rename to src/managed-engine/src/process.rs