From fee8ea1d4d72e18ef529c75fbeb0462dd778b529 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 6 May 2026 16:43:25 +0800 Subject: [PATCH 1/3] refactor: extract `vllm-managed-engine` crate Signed-off-by: Bugen Zhao --- Cargo.lock | 14 +- Cargo.toml | 2 + src/cmd/Cargo.toml | 4 +- src/cmd/src/cli.rs | 102 ++--------- src/cmd/src/cli/tests.rs | 72 +++++--- src/cmd/src/main.rs | 16 +- src/managed-engine/Cargo.toml | 17 ++ .../src/cli.rs} | 168 +++++++++++++++--- src/managed-engine/src/lib.rs | 4 + .../src/process.rs} | 0 10 files changed, 253 insertions(+), 146 deletions(-) create mode 100644 src/managed-engine/Cargo.toml rename src/{cmd/src/cli/serve_validate.rs => managed-engine/src/cli.rs} (50%) create mode 100644 src/managed-engine/src/lib.rs rename src/{cmd/src/managed_engine.rs => managed-engine/src/process.rs} (100%) diff --git a/Cargo.lock b/Cargo.lock index 8fb8f14..9da5315 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5454,7 +5454,6 @@ dependencies = [ "educe", "expect-test", "itertools 0.14.0", - "libc", "native-tls", "serde", "serde_json", @@ -5467,6 +5466,7 @@ dependencies = [ "tracing-subscriber", "uuid", "vllm-engine-core-client", + "vllm-managed-engine", "vllm-server", ] @@ -5530,6 +5530,18 @@ dependencies = [ "zeromq", ] +[[package]] +name = "vllm-managed-engine" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "expect-test", + "libc", + "tokio", + "tracing", +] + [[package]] name = "vllm-metrics" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 5faf22e..e47175f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "src/cmd", "src/engine-core-client", "src/llm", + "src/managed-engine", "src/metrics", "src/server", "src/text", @@ -95,6 +96,7 @@ validator = { version = "0.20.0", features = ["derive"] } vllm-chat = { path = "src/chat" } vllm-engine-core-client = { path = "src/engine-core-client" } vllm-llm = { path = "src/llm" } +vllm-managed-engine = { path = "src/managed-engine" } vllm-metrics = { path = "src/metrics" } vllm-server = { path = "src/server" } vllm-text = { path = "src/text" } diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index f2c2ca8..5f1e3ee 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -16,19 +16,19 @@ anyhow.workspace = true clap.workspace = true educe.workspace = true itertools.workspace = true -libc.workspace = true native-tls-vendored = { workspace = true, optional = true } serde.workspace = true serde_json.workspace = true serde_with.workspace = true thiserror-ext.workspace = true time.workspace = true -tokio = { workspace = true, features = ["process", "signal"] } +tokio = { workspace = true, features = ["signal"] } tokio-util.workspace = true tracing.workspace = true tracing-subscriber.workspace = true uuid.workspace = true vllm-engine-core-client.workspace = true +vllm-managed-engine.workspace = true vllm-server.workspace = true [dev-dependencies] diff --git a/src/cmd/src/cli.rs b/src/cmd/src/cli.rs index 15f9d98..551c89f 100644 --- a/src/cmd/src/cli.rs +++ b/src/cmd/src/cli.rs @@ -4,7 +4,6 @@ //! - Engine args: //! - Environment variables: -mod serve_validate; mod unsupported; use std::collections::HashMap; @@ -20,13 +19,14 @@ use serde_json::Value; use thiserror_ext::AsReport as _; use uuid::Uuid; use vllm_engine_core_client::TransportMode; +use vllm_managed_engine::ManagedEngineConfig; +use vllm_managed_engine::cli::{ManagedEngineArgs, repartition_managed_engine_args}; use vllm_server::{ ChatTemplateContentFormatOption, Config, CoordinatorMode, HttpListenerMode, ParserSelection, RendererSelection, }; use crate::cli::unsupported::UnsupportedArgs; -use crate::managed_engine::ManagedEngineConfig; /// Top-level parser for the `vllm-rs` binary. #[derive(Debug, Parser)] @@ -50,7 +50,7 @@ impl Cli { T: Into, { let args: Vec = itr.into_iter().map(Into::into).collect(); - let repartitioned_args = serve_validate::repartition_serve_args(&args)?; + let repartitioned_args = repartition_managed_engine_args::(&args, "serve")?; ::try_parse_from(&repartitioned_args).inspect(|cli| { if let Command::Serve(serve) = &cli.command && serve.debug_cli @@ -63,7 +63,10 @@ impl Cli { "Repartitioned CLI args: {}\n", repartitioned_args.join(OsStr::new(" ")).display() ); - println!("Passthrough Python args: {}", serve.python_args.join(" ")); + println!( + "Passthrough Python args: {}", + serve.managed_engine.python_args.join(" ") + ); std::process::exit(0); } }) @@ -337,9 +340,6 @@ pub struct ServeArgs { /// frontend. #[arg(long)] pub headless: bool, - /// Python executable used to launch the managed headless vLLM engine. - #[arg(long, env = "VLLM_RS_PYTHON", default_value = "python3")] - pub python: String, /// HTTP bind host for the OpenAI-compatible server. #[arg(long, default_value = "127.0.0.1")] pub host: String, @@ -349,30 +349,6 @@ pub struct ServeArgs { /// Unix domain socket path. If set, host and port arguments are ignored. #[arg(long)] pub uds: Option, - /// Host/IP used both for the managed-engine handshake endpoint and the - /// frontend-advertised input/output ZMQ socket addresses. - #[arg( - long = "data-parallel-address", - visible_alias = "handshake-host", - default_value = "127.0.0.1" - )] - pub handshake_host: String, - /// Optional TCP port for the managed-engine handshake / data-parallel RPC - /// endpoint. - /// - /// When omitted, the CLI allocates an ephemeral port automatically. - #[arg( - long = "data-parallel-rpc-port", - visible_alias = "handshake-port", - value_parser = clap::value_parser!(u16).range(1..) - )] - pub handshake_port: Option, - /// Number of data parallel replicas across the whole deployment. - #[arg(long, default_value_t = 1)] - pub data_parallel_size: usize, - /// Number of data parallel replicas to run on this node. - #[arg(long)] - pub data_parallel_size_local: Option, /// Flag to print debug information about CLI argument parsing and exit. #[educe(Debug(ignore))] @@ -383,33 +359,18 @@ pub struct ServeArgs { #[command(flatten)] pub runtime: SharedRuntimeArgs, - /// Additional arguments forwarded to `python -m vllm.entrypoints.cli.main - /// serve ...`. - /// - /// Arguments after an explicit `--` are forwarded verbatim. Before `--`, - /// `vllm-rs serve` automatically keeps recognized frontend options on - /// the Rust side and forwards everything else to Python. - #[arg( - last = true, - allow_hyphen_values = true, - help_heading = "Passthrough arguments" - )] - pub python_args: Vec, + /// Managed Python headless-engine arguments. + #[command(flatten)] + pub managed_engine: ManagedEngineArgs, } impl ServeArgs { - /// Build the handshake address shared by the Rust frontend and managed - /// Python engine. - pub fn handshake_address(&self, handshake_port: u16) -> String { - format!("tcp://{}:{}", self.handshake_host, handshake_port) - } - /// Build the OpenAI-server runtime config used after the managed Python /// engine starts. pub fn to_frontend_config(&self, handshake_address: String) -> Config { // Prefer IPC sockets for local engine input/output. let (local_input_address, local_output_address) = - self.frontend_local_only().then(frontend_ipc_addresses).unzip(); + self.managed_engine.frontend_local_only().then(frontend_ipc_addresses).unzip(); let listener_mode = match &self.uds { Some(path) => HttpListenerMode::BindUnix { path: path.clone() }, None => HttpListenerMode::BindTcp { @@ -421,46 +382,21 @@ impl ServeArgs { self.runtime.clone().into_managed_config( listener_mode, handshake_address, - self.handshake_host.clone(), - self.data_parallel_size, + self.managed_engine.handshake_host.clone(), + self.managed_engine.data_parallel_size, local_input_address, local_output_address, ) } - /// Build the managed Python-engine spawn configuration for one resolved + /// Build the managed Python-engine spawn configuration with the given /// handshake port. - pub fn into_managed_engine_config(self, handshake_port: u16) -> ManagedEngineConfig { - let mut python_args = self.python_args; - // Manually forward some args to the Python engine. - if let Some(max_model_len) = self.runtime.max_model_len { - python_args.push("--max-model-len".to_string()); - python_args.push(max_model_len.to_string()); - } - if let Some(data_parallel_size_local) = self.data_parallel_size_local { - python_args.push("--data-parallel-size-local".to_string()); - python_args.push(data_parallel_size_local.to_string()); - } - - ManagedEngineConfig { - python: self.python, - model: self.runtime.model, - handshake_host: self.handshake_host, + pub fn to_managed_engine_config(&self, handshake_port: u16) -> ManagedEngineConfig { + self.managed_engine.clone().into_config( + self.runtime.model.clone(), + self.runtime.max_model_len, handshake_port, - data_parallel_size: self.data_parallel_size, - python_args, - } - } - - fn local_engine_count(&self) -> usize { - self.data_parallel_size_local.unwrap_or(self.data_parallel_size) - } - - /// Return whether the managed Rust frontend only needs to communicate with - /// colocated engines. - fn frontend_local_only(&self) -> bool { - self.data_parallel_size_local != Some(0) - && self.local_engine_count() == self.data_parallel_size + ) } } diff --git a/src/cmd/src/cli/tests.rs b/src/cmd/src/cli/tests.rs index 8de2adb..61db787 100644 --- a/src/cmd/src/cli/tests.rs +++ b/src/cmd/src/cli/tests.rs @@ -25,14 +25,9 @@ fn serve_args_forward_python_flags_with_separator() { command: Serve( ServeArgs { headless: false, - python: "../vllm/.venv/bin/python", host: "127.0.0.1", port: 8000, uds: None, - handshake_host: "127.0.0.1", - handshake_port: None, - data_parallel_size: 1, - data_parallel_size_local: None, runtime: SharedRuntimeArgs { model: "Qwen/Qwen3-0.6B", engine_ready_timeout_secs: 600, @@ -50,10 +45,17 @@ fn serve_args_forward_python_flags_with_separator() { enable_log_requests: false, disable_log_stats: false, }, - python_args: [ - "--dtype", - "float16", - ], + managed_engine: ManagedEngineArgs { + python: "../vllm/.venv/bin/python", + handshake_host: "127.0.0.1", + handshake_port: None, + data_parallel_size: 1, + data_parallel_size_local: None, + python_args: [ + "--dtype", + "float16", + ], + }, }, ), } @@ -77,7 +79,7 @@ fn serve_args_auto_forward_python_flags_without_separator() { let Command::Serve(args) = cli.command else { panic!("expected serve args"); }; - assert_eq!(args.python_args, vec!["--dtype", "float16"]); + assert_eq!(args.managed_engine.python_args, vec!["--dtype", "float16"]); } #[test] @@ -87,7 +89,10 @@ fn serve_args_auto_forward_python_multi_char_alias_without_separator() { let Command::Serve(args) = cli.command else { panic!("expected serve args"); }; - assert_eq!(args.python_args, vec!["--tensor-parallel-size", "2"]); + assert_eq!( + args.managed_engine.python_args, + vec!["--tensor-parallel-size", "2"] + ); } #[test] @@ -464,7 +469,7 @@ fn serve_args_keep_python_passthrough_flags_after_separator() { panic!("expected serve args"); }; assert_eq!( - args.python_args, + args.managed_engine.python_args, vec!["--tensor-parallel-size", "2", "--dtype", "float16"] ); } @@ -488,7 +493,10 @@ fn serve_args_keep_python_multi_char_alias_after_separator() { let Command::Serve(args) = cli.command else { panic!("expected serve args"); }; - assert_eq!(args.python_args, vec!["-tp", "2", "--dtype", "float16"]); + assert_eq!( + args.managed_engine.python_args, + vec!["-tp", "2", "--dtype", "float16"] + ); } #[test] @@ -506,7 +514,10 @@ fn serve_args_keep_frontend_arg_after_separator() { let Command::Serve(args) = cli.command else { panic!("expected serve args"); }; - assert_eq!(args.python_args, vec!["--uds", "/tmp/vllm.sock"]); + assert_eq!( + args.managed_engine.python_args, + vec!["--uds", "/tmp/vllm.sock"] + ); } #[test] @@ -526,7 +537,10 @@ fn serve_args_keep_python_multi_char_engine_aliases_after_separator() { let Command::Serve(args) = cli.command else { panic!("expected serve args"); }; - assert_eq!(args.python_args, vec!["-dpr", "1", "-dpl", "2"]); + assert_eq!( + args.managed_engine.python_args, + vec!["-dpr", "1", "-dpl", "2"] + ); } #[test] @@ -536,7 +550,7 @@ fn serve_args_auto_forward_unknown_flags_without_separator() { let Command::Serve(args) = cli.command else { panic!("expected serve args"); }; - assert_eq!(args.python_args, vec!["--foo", "bar"]); + assert_eq!(args.managed_engine.python_args, vec!["--foo", "bar"]); } #[test] @@ -547,7 +561,7 @@ fn serve_args_auto_forward_negative_value_without_separator() { let Command::Serve(args) = cli.command else { panic!("expected serve args"); }; - assert_eq!(args.python_args, vec!["--dtype", "-1"]); + assert_eq!(args.managed_engine.python_args, vec!["--dtype", "-1"]); } #[test] @@ -572,16 +586,9 @@ fn serve_args_accept_handshake_aliases() { command: Serve( ServeArgs { headless: false, - python: "python3", host: "127.0.0.1", port: 8000, uds: None, - handshake_host: "10.99.48.128", - handshake_port: Some( - 13345, - ), - data_parallel_size: 4, - data_parallel_size_local: None, runtime: SharedRuntimeArgs { model: "Qwen/Qwen3-0.6B", engine_ready_timeout_secs: 600, @@ -597,7 +604,16 @@ fn serve_args_accept_handshake_aliases() { enable_log_requests: false, disable_log_stats: false, }, - python_args: [], + managed_engine: ManagedEngineArgs { + python: "python3", + handshake_host: "10.99.48.128", + handshake_port: Some( + 13345, + ), + data_parallel_size: 4, + data_parallel_size_local: None, + python_args: [], + }, }, ), } @@ -624,9 +640,9 @@ fn serve_args_accept_data_parallel_primary_flags() { panic!("expected serve args"); }; assert!(!args.headless); - assert_eq!(args.handshake_host, "10.99.48.128"); - assert_eq!(args.handshake_port, Some(13345)); - assert_eq!(args.data_parallel_size, 4); + assert_eq!(args.managed_engine.handshake_host, "10.99.48.128"); + assert_eq!(args.managed_engine.handshake_port, Some(13345)); + assert_eq!(args.managed_engine.data_parallel_size, 4); } #[test] diff --git a/src/cmd/src/main.rs b/src/cmd/src/main.rs index 1a744ee..ce4e37e 100644 --- a/src/cmd/src/main.rs +++ b/src/cmd/src/main.rs @@ -1,6 +1,5 @@ mod cli; mod logging; -mod managed_engine; use std::env; use std::process::ExitStatus; @@ -8,9 +7,9 @@ use std::process::ExitStatus; use anyhow::{Context, Result, anyhow, bail}; use tokio_util::sync::CancellationToken; use tracing::{info, warn}; +use vllm_managed_engine::ManagedEngineHandle; use crate::cli::{Cli, Command}; -use crate::managed_engine::{ManagedEngineHandle, allocate_handshake_port}; const TOKIO_WORKER_THREADS_ENV: &str = "TOKIO_WORKER_THREADS"; const DEFAULT_MAX_TOKIO_WORKER_THREADS: usize = 32; @@ -96,20 +95,17 @@ async fn async_main(cli: Cli) -> Result<()> { match cli.command { Command::Frontend(args) => vllm_server::serve(args.into_config(), shutdown_signal()).await, Command::Serve(args) => { - let handshake_port = match args.handshake_port { - Some(port) => port, - None => allocate_handshake_port(&args.handshake_host)?, - }; + let handshake_port = args.managed_engine.resolve_handshake_port()?; - if args.data_parallel_size_local == Some(0) { + if args.managed_engine.data_parallel_size_local == Some(0) { if args.headless { bail!("cannot combine `--headless` with `--data-parallel-size-local 0`"); } - let handshake_address = args.handshake_address(handshake_port); + let handshake_address = args.managed_engine.handshake_address(handshake_port); info!( %handshake_address, - engine_count = args.data_parallel_size, + engine_count = args.managed_engine.data_parallel_size, "running Rust frontend without a managed local Python engine" ); let config = args.to_frontend_config(handshake_address); @@ -117,7 +113,7 @@ async fn async_main(cli: Cli) -> Result<()> { } let shutdown_timeout = args.runtime.shutdown_timeout(); - let engine_config = args.clone().into_managed_engine_config(handshake_port); + let engine_config = args.to_managed_engine_config(handshake_port); let handshake_address = engine_config.handshake_address(); let engine = ManagedEngineHandle::spawn(engine_config) diff --git a/src/managed-engine/Cargo.toml b/src/managed-engine/Cargo.toml new file mode 100644 index 0000000..7578f7f --- /dev/null +++ b/src/managed-engine/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "vllm-managed-engine" +version.workspace = true +edition.workspace = true + +[dependencies] +anyhow.workspace = true +clap.workspace = true +libc.workspace = true +tokio = { workspace = true, features = ["process"] } +tracing.workspace = true + +[dev-dependencies] +expect-test.workspace = true + +[lints] +workspace = true diff --git a/src/cmd/src/cli/serve_validate.rs b/src/managed-engine/src/cli.rs similarity index 50% rename from src/cmd/src/cli/serve_validate.rs rename to src/managed-engine/src/cli.rs index aa14694..e17e1ce 100644 --- a/src/cmd/src/cli/serve_validate.rs +++ b/src/managed-engine/src/cli.rs @@ -1,10 +1,112 @@ use std::collections::HashSet; use std::ffi::OsString; -use clap::CommandFactory as _; use clap::error::ErrorKind; +use clap::{Args, CommandFactory}; -use crate::cli::Cli; +use crate::{ManagedEngineConfig, allocate_handshake_port}; + +/// Managed Python headless-engine CLI arguments. +#[derive(Debug, Clone, Args, PartialEq, Eq)] +pub struct ManagedEngineArgs { + /// Python executable used to launch the managed headless vLLM engine. + #[arg(long, env = "VLLM_RS_PYTHON", default_value = "python3")] + pub python: String, + /// Host/IP used both for the managed-engine handshake endpoint and the + /// frontend-advertised input/output ZMQ socket addresses. + #[arg( + long = "data-parallel-address", + visible_alias = "handshake-host", + default_value = "127.0.0.1" + )] + pub handshake_host: String, + /// Optional TCP port for the managed-engine handshake / data-parallel RPC + /// endpoint. + /// + /// When omitted, the CLI allocates an ephemeral port automatically. + #[arg( + long = "data-parallel-rpc-port", + visible_alias = "handshake-port", + value_parser = clap::value_parser!(u16).range(1..) + )] + pub handshake_port: Option, + /// Number of data parallel replicas across the whole deployment. + #[arg(long, default_value_t = 1)] + pub data_parallel_size: usize, + /// Number of data parallel replicas to run on this node. + #[arg(long)] + pub data_parallel_size_local: Option, + + /// Additional arguments forwarded to `python -m vllm.entrypoints.cli.main + /// serve ...`. + /// + /// Arguments after an explicit `--` are forwarded verbatim. Before `--`, + /// `vllm-rs serve` automatically keeps recognized frontend options on + /// the Rust side and forwards everything else to Python. + #[arg( + last = true, + allow_hyphen_values = true, + help_heading = "Passthrough arguments" + )] + pub python_args: Vec, +} + +impl ManagedEngineArgs { + /// Build the handshake address shared by the Rust frontend and managed + /// Python engine. + pub fn handshake_address(&self, handshake_port: u16) -> String { + format!("tcp://{}:{}", self.handshake_host, handshake_port) + } + + /// Resolve the handshake port, either from the CLI argument (if specified) + /// or by allocating a fresh port. + pub fn resolve_handshake_port(&self) -> anyhow::Result { + self.handshake_port + .map(Ok) + .unwrap_or_else(|| allocate_handshake_port(&self.handshake_host)) + } + + /// Build the managed Python-engine spawn configuration. + pub fn into_config( + self, + model: String, + max_model_len: Option, + handshake_port: u16, + ) -> ManagedEngineConfig { + let mut python_args = self.python_args; + // Manually forward some args to the Python engine. + if let Some(max_model_len) = max_model_len { + python_args.push("--max-model-len".to_string()); + python_args.push(max_model_len.to_string()); + } + if let Some(data_parallel_size_local) = self.data_parallel_size_local { + python_args.push("--data-parallel-size-local".to_string()); + python_args.push(data_parallel_size_local.to_string()); + } + + ManagedEngineConfig { + python: self.python, + model, + handshake_host: self.handshake_host, + handshake_port, + data_parallel_size: self.data_parallel_size, + python_args, + } + } + + /// Return the number of engines that the Rust frontend should expect to + /// coordinate with. + fn local_engine_count(&self) -> usize { + self.data_parallel_size_local.unwrap_or(self.data_parallel_size) + } + + /// Return whether the managed Rust frontend only needs to communicate with + /// colocated engines. + pub fn frontend_local_only(&self) -> bool { + self.data_parallel_size_local != Some(0) + && self.local_engine_count() == self.data_parallel_size + } +} /// Python `argparse` accepts these multi-character single-dash aliases, but /// `clap` cannot model them directly. @@ -28,12 +130,19 @@ const PYTHON_MULTI_CHAR_ALIASES: &[(&str, &str)] = &[ ("-ac", "--attention-config"), ]; -/// Repartition `serve` argv so Rust frontend-owned flags stay before `--`, -/// while everything else is forwarded to Python. -pub(super) fn repartition_serve_args(args: &[OsString]) -> Result, clap::Error> { - if args.get(1).map(|arg| arg.as_os_str()) != Some("serve".as_ref()) { +/// If the given arguments match the specified subcommand, repartition them so +/// Rust-owned flags stay before `--`, while everything else is forwarded to +/// Python. +pub fn repartition_managed_engine_args( + args: &[OsString], + subcommand: &str, +) -> Result, clap::Error> +where + C: CommandFactory, +{ + if !matches_subcommand(args, subcommand) { return Ok(args.to_vec()); - } + }; if args.get(2).is_none() { return Ok(args.to_vec()); @@ -44,12 +153,12 @@ pub(super) fn repartition_serve_args(args: &[OsString]) -> Result, return Ok(args.to_vec()); } if model == "--" || is_option_like(&model) { - return Err(build_missing_model_error()); + return Err(build_missing_model_error::(subcommand)); } - let (front_args, explicit_passthrough, had_separator) = split_serve_args(&args[3..]); + let (front_args, explicit_passthrough, had_separator) = split_managed_engine_args(&args[3..]); let normalized_front_args = normalize_python_arg_aliases(front_args); - let (long_flags, short_flags) = collect_frontend_option_names(); + let (long_flags, short_flags) = collect_frontend_option_names::(subcommand); let mut frontend_chunks = Vec::new(); let mut python_chunks = Vec::new(); @@ -89,7 +198,13 @@ pub(super) fn repartition_serve_args(args: &[OsString]) -> Result, Ok(repartitioned) } -fn split_serve_args(args: &[OsString]) -> (&[OsString], &[OsString], bool) { +fn matches_subcommand(args: &[OsString], subcommand: &str) -> bool { + args.get(1) + .and_then(|arg| arg.to_str()) + .is_some_and(|candidate| candidate == subcommand) +} + +fn split_managed_engine_args(args: &[OsString]) -> (&[OsString], &[OsString], bool) { if let Some(index) = args.iter().position(|arg| arg == "--") { (&args[..index], &args[index + 1..], true) } else { @@ -159,14 +274,18 @@ fn chunk_head_is_frontend_owned( short_flags.contains(&short) } -fn collect_frontend_option_names() -> (HashSet, HashSet) { - let mut command = Cli::command(); - let serve_command = - command.find_subcommand_mut("serve").expect("serve subcommand should exist"); +fn collect_frontend_option_names(subcommand: &str) -> (HashSet, HashSet) +where + C: CommandFactory, +{ + let mut command = C::command(); + let subcommand = command + .find_subcommand_mut(subcommand) + .expect("managed-engine subcommand should exist"); let mut long_flags = HashSet::new(); let mut short_flags = HashSet::new(); - for arg in serve_command.get_arguments() { + for arg in subcommand.get_arguments() { if let Some(names) = arg.get_long_and_visible_aliases() { long_flags.extend(names.into_iter().map(str::to_owned)); } @@ -204,12 +323,17 @@ fn is_help_flag(arg: &str) -> bool { arg == "-h" || arg == "--help" } -fn build_missing_model_error() -> clap::Error { - let mut command = Cli::command(); - let serve_command = - command.find_subcommand_mut("serve").expect("serve subcommand should exist"); - serve_command.error( +fn build_missing_model_error(subcommand: &str) -> clap::Error +where + C: CommandFactory, +{ + let mut command = C::command(); + let subcommand_name = subcommand.to_string(); + let subcommand = command + .find_subcommand_mut(subcommand) + .expect("managed-engine subcommand should exist"); + subcommand.error( ErrorKind::MissingRequiredArgument, - "serve requires the model to appear immediately after the subcommand", + format!("{subcommand_name} requires the model to appear immediately after the subcommand"), ) } diff --git a/src/managed-engine/src/lib.rs b/src/managed-engine/src/lib.rs new file mode 100644 index 0000000..e981210 --- /dev/null +++ b/src/managed-engine/src/lib.rs @@ -0,0 +1,4 @@ +pub mod cli; +mod process; + +pub use process::{ManagedEngineConfig, ManagedEngineHandle, allocate_handshake_port}; diff --git a/src/cmd/src/managed_engine.rs b/src/managed-engine/src/process.rs similarity index 100% rename from src/cmd/src/managed_engine.rs rename to src/managed-engine/src/process.rs From ca3025cdf277eedd0d7b5b5aba888eb3a4c1c328 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 6 May 2026 16:57:47 +0800 Subject: [PATCH 2/3] bump zeromq Signed-off-by: Bugen Zhao --- Cargo.lock | 74 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- Cargo.toml | 4 +-- 2 files changed, 74 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9da5315..3305f09 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -185,6 +185,24 @@ dependencies = [ "stable_deref_trait", ] +[[package]] +name = "async-io" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "456b8a8feb6f42d237746d4b3e9a178494627745c3c56c6ea55d92ba50d026fc" +dependencies = [ + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-io", + "futures-lite", + "parking", + "polling", + "rustix", + "slab", + "windows-sys 0.61.2", +] + [[package]] name = "async-openai" version = "0.33.1" @@ -679,6 +697,15 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "console" version = "0.15.11" @@ -1446,6 +1473,16 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" +[[package]] +name = "futures-lite" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f78e10609fe0e0b3f4157ffab1876319b5b0db102a2c60dc4626306dc46b44ad" +dependencies = [ + "futures-core", + "pin-project-lite", +] + [[package]] name = "futures-macro" version = "0.3.32" @@ -2850,6 +2887,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.5" @@ -3050,6 +3093,20 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "polling" +version = "3.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d0e4f59085d47d8241c88ead0f274e8a0cb551f3625263c05eb8dd897c34218" +dependencies = [ + "cfg-if", + "concurrent-queue", + "hermit-abi", + "pin-project-lite", + "rustix", + "windows-sys 0.61.2", +] + [[package]] name = "portable-atomic" version = "1.13.1" @@ -5829,6 +5886,17 @@ version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a28ac98ddc8b9274cb41bb4d9d4d5c425b6020c50c46f25559911905610b4a88" +[[package]] +name = "win_uds" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dd30a1a28a3799479cbf4e17284a220ea9ff6bad098a9d0224543a5d1efe1da" +dependencies = [ + "async-io", + "futures-io", + "socket2", +] + [[package]] name = "winapi" version = "0.3.9" @@ -6282,8 +6350,9 @@ checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0" [[package]] name = "zeromq" -version = "0.6.0-pre.1" -source = "git+https://github.com/BugenZhao/zmq.rs?rev=dde4ee5c48846b0cc22b1f4fad8c7514748180fe#dde4ee5c48846b0cc22b1f4fad8c7514748180fe" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "efb2c254fd8f366755335c9e43b865f8484fe3bd717d65ffe7c3f28852863030" dependencies = [ "async-trait", "asynchronous-codec", @@ -6301,6 +6370,7 @@ dependencies = [ "tokio", "tokio-util", "uuid", + "win_uds", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index e47175f..fddb52d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -101,10 +101,10 @@ vllm-metrics = { path = "src/metrics" } vllm-server = { path = "src/server" } vllm-text = { path = "src/text" } winnow = "1.0.2" -zeromq = { git = "https://github.com/BugenZhao/zmq.rs", rev = "dde4ee5c48846b0cc22b1f4fad8c7514748180fe", default-features = false, features = [ +zeromq = { version = "0.6.0", default-features = false, features = [ "tokio-runtime", "all-transport", -] } # TODO: switch to crates.io version after the patch gets released +] } [workspace.lints.clippy] too_many_arguments = "allow" From 98643a81bafad178d20ae94ca8415adcd672c3d9 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 6 May 2026 17:18:45 +0800 Subject: [PATCH 3/3] refactor `repartition_managed_engine_args` to accept root level command Signed-off-by: Bugen Zhao --- src/cmd/src/cli.rs | 2 +- src/cmd/src/cli/tests.rs | 2 +- src/managed-engine/src/cli.rs | 78 ++++++++++++++++++++--------------- 3 files changed, 46 insertions(+), 36 deletions(-) diff --git a/src/cmd/src/cli.rs b/src/cmd/src/cli.rs index 551c89f..17c7fed 100644 --- a/src/cmd/src/cli.rs +++ b/src/cmd/src/cli.rs @@ -50,7 +50,7 @@ impl Cli { T: Into, { let args: Vec = itr.into_iter().map(Into::into).collect(); - let repartitioned_args = repartition_managed_engine_args::(&args, "serve")?; + let repartitioned_args = repartition_managed_engine_args::(&args, Some("serve"))?; ::try_parse_from(&repartitioned_args).inspect(|cli| { if let Command::Serve(serve) = &cli.command && serve.debug_cli diff --git a/src/cmd/src/cli/tests.rs b/src/cmd/src/cli/tests.rs index 61db787..a5a0d12 100644 --- a/src/cmd/src/cli/tests.rs +++ b/src/cmd/src/cli/tests.rs @@ -430,7 +430,7 @@ fn serve_args_reject_flags_before_model() { .unwrap_err(); expect![[r#" - error: serve requires the model to appear immediately after the subcommand + error: the model must appear immediately after the command Usage: vllm-rs serve [OPTIONS] [-- ...] diff --git a/src/managed-engine/src/cli.rs b/src/managed-engine/src/cli.rs index e17e1ce..302737d 100644 --- a/src/managed-engine/src/cli.rs +++ b/src/managed-engine/src/cli.rs @@ -130,35 +130,61 @@ const PYTHON_MULTI_CHAR_ALIASES: &[(&str, &str)] = &[ ("-ac", "--attention-config"), ]; -/// If the given arguments match the specified subcommand, repartition them so -/// Rust-owned flags stay before `--`, while everything else is forwarded to -/// Python. +/// Repartition managed-engine argv so Rust-owned flags stay before `--`, while +/// everything else is forwarded to Python. pub fn repartition_managed_engine_args( args: &[OsString], - subcommand: &str, + subcommand: Option<&str>, ) -> Result, clap::Error> where C: CommandFactory, { - if !matches_subcommand(args, subcommand) { - return Ok(args.to_vec()); + let command = C::command(); + let (prefix, real_args, command) = match subcommand { + Some(subcommand) => { + if !matches_subcommand(args, subcommand) { + return Ok(args.to_vec()); + }; + + let subcommand = command + .find_subcommand(subcommand) + .expect("managed-engine subcommand should exist"); + + (args[..2].to_vec(), &args[2..], subcommand) + } + None => { + let Some(program) = args.first() else { + return Ok(args.to_vec()); + }; + + (vec![program.clone()], &args[1..], &command) + } }; - if args.get(2).is_none() { + let mut repartitioned = prefix; + repartitioned.extend(repartition_real_managed_engine_args(real_args, command)?); + Ok(repartitioned) +} + +fn repartition_real_managed_engine_args( + args: &[OsString], + command: &clap::Command, +) -> Result, clap::Error> { + let Some(model) = args.first() else { return Ok(args.to_vec()); - } + }; - let model = args[2].to_string_lossy(); + let model = model.to_string_lossy(); if is_help_flag(&model) { return Ok(args.to_vec()); } if model == "--" || is_option_like(&model) { - return Err(build_missing_model_error::(subcommand)); + return Err(build_missing_model_error(command)); } - let (front_args, explicit_passthrough, had_separator) = split_managed_engine_args(&args[3..]); + let (long_flags, short_flags) = collect_option_names(command); + let (front_args, explicit_passthrough, had_separator) = split_managed_engine_args(&args[1..]); let normalized_front_args = normalize_python_arg_aliases(front_args); - let (long_flags, short_flags) = collect_frontend_option_names::(subcommand); let mut frontend_chunks = Vec::new(); let mut python_chunks = Vec::new(); @@ -187,7 +213,7 @@ where ); } - let mut repartitioned = vec![args[0].clone(), args[1].clone(), args[2].clone()]; + let mut repartitioned = vec![args[0].clone()]; repartitioned.extend(frontend_chunks); if had_separator || !python_chunks.is_empty() || !explicit_passthrough.is_empty() { repartitioned.push("--".into()); @@ -274,18 +300,10 @@ fn chunk_head_is_frontend_owned( short_flags.contains(&short) } -fn collect_frontend_option_names(subcommand: &str) -> (HashSet, HashSet) -where - C: CommandFactory, -{ - let mut command = C::command(); - let subcommand = command - .find_subcommand_mut(subcommand) - .expect("managed-engine subcommand should exist"); - +fn collect_option_names(command: &clap::Command) -> (HashSet, HashSet) { let mut long_flags = HashSet::new(); let mut short_flags = HashSet::new(); - for arg in subcommand.get_arguments() { + for arg in command.get_arguments() { if let Some(names) = arg.get_long_and_visible_aliases() { long_flags.extend(names.into_iter().map(str::to_owned)); } @@ -323,17 +341,9 @@ fn is_help_flag(arg: &str) -> bool { arg == "-h" || arg == "--help" } -fn build_missing_model_error(subcommand: &str) -> clap::Error -where - C: CommandFactory, -{ - let mut command = C::command(); - let subcommand_name = subcommand.to_string(); - let subcommand = command - .find_subcommand_mut(subcommand) - .expect("managed-engine subcommand should exist"); - subcommand.error( +fn build_missing_model_error(command: &clap::Command) -> clap::Error { + command.clone().error( ErrorKind::MissingRequiredArgument, - format!("{subcommand_name} requires the model to appear immediately after the subcommand"), + "the model must appear immediately after the command", ) }