diff --git a/Cargo.lock b/Cargo.lock index 76eacbc8..8fb8f14c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -167,7 +167,7 @@ checksum = "0ae92a5119aa49cdbcf6b9f893fe4e1d98b04ccbf82ee0584ad948a44a734dea" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -226,7 +226,7 @@ checksum = "81872a8e595e8ceceab71c6ba1f9078e313b452a1e31934e6763ef5d308705e4" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -237,7 +237,7 @@ checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -283,7 +283,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -643,7 +643,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -901,7 +901,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn", + "syn 2.0.117", ] [[package]] @@ -914,7 +914,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn", + "syn 2.0.117", ] [[package]] @@ -925,7 +925,7 @@ checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" dependencies = [ "darling_core 0.20.11", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -936,7 +936,7 @@ checksum = "ac3984ec7bd6cfa798e62b4a642426a5be0e68f9401cfc2a01e3fa9ea2fcdb8d" dependencies = [ "darling_core 0.23.0", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -986,7 +986,7 @@ dependencies = [ "darling 0.20.11", "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -996,7 +996,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab63b0e2bf4d5928aff72e83a7dace85d7bba5fe12dcc3c5a572d78caffd3f3c" dependencies = [ "derive_builder_core", - "syn", + "syn 2.0.117", ] [[package]] @@ -1016,7 +1016,7 @@ checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", "unicode-xid", ] @@ -1060,7 +1060,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -1105,7 +1105,7 @@ dependencies = [ "enum-ordinalize", "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -1138,7 +1138,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -1158,7 +1158,7 @@ checksum = "8ca9601fb2d62598ee17836250842873a413586e5d7ed88b356e38ddbb0ec631" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -1201,7 +1201,7 @@ checksum = "44f23cf4b44bfce11a86ace86f8a73ffdec849c9fd00a386a53d278bd9e81fb3" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -1328,7 +1328,7 @@ checksum = "a0aca10fb742cb43f9e7bb8467c91aa9bcb8e3ffbc6a6f7389bb93ffc920577d" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -1454,7 +1454,7 @@ checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -2085,7 +2085,7 @@ checksum = "c34819042dc3d3971c46c2190835914dfbe0c3c13f61449b2997f4e9722dfa60" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -2113,7 +2113,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -2187,7 +2187,7 @@ checksum = "2a8c8b344124222efd714b73bb41f8b5120b27a7cc1c75593a6ff768d9d05aa4" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -2517,7 +2517,7 @@ checksum = "e4db6d5580af57bf992f59068d4ea26fd518574ff48d7639b255a36f9de6e7e9" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -2650,7 +2650,7 @@ checksum = "ed3955f1a9c7c0c15e092f9c887db08b1fc683305fdf6eb6684f22555355e202" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -2813,7 +2813,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -2988,7 +2988,7 @@ checksum = "d9b20ed30f105399776b9c883e68e536ef602a16ae6f596d2c473591d6ad64c6" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -3096,7 +3096,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" dependencies = [ "proc-macro2", - "syn", + "syn 2.0.117", ] [[package]] @@ -3136,7 +3136,7 @@ dependencies = [ "proc-macro-error-attr2", "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -3164,7 +3164,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52717f9a02b6965224f95ca2a81e2e0c5c43baacd28ca057577988930b6c3d5b" dependencies = [ "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -3187,7 +3187,7 @@ checksum = "9adf1691c04c0a5ff46ff8f262b58beb07b0dbb61f96f9f54f6cbd82106ed87f" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -3217,7 +3217,7 @@ dependencies = [ "pulldown-cmark", "pulldown-cmark-to-cmark", "regex", - "syn", + "syn 2.0.117", "tempfile", ] @@ -3231,7 +3231,7 @@ dependencies = [ "itertools 0.14.0", "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -3552,7 +3552,7 @@ checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -3956,7 +3956,7 @@ dependencies = [ "proc-macro2", "quote", "serde_derive_internals", - "syn", + "syn 2.0.117", ] [[package]] @@ -4065,7 +4065,7 @@ dependencies = [ "darling 0.20.11", "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4076,7 +4076,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4087,7 +4087,7 @@ checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4123,7 +4123,7 @@ checksum = "175ee3e80ae9982737ca543e96133087cbd9a485eecc3bc4de9c1a37b47ea59c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4144,7 +4144,7 @@ checksum = "ec3a1e7d2eadec84deabd46ae061bf480a91a6bce74d25dad375bd656f2e19d8" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4187,7 +4187,7 @@ dependencies = [ "darling 0.23.0", "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4213,7 +4213,7 @@ checksum = "0a7d91949b85b0d2fb687445e448b40d322b6b3e4af6b44a29b21d9a5f33e6d9" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4382,7 +4382,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4394,7 +4394,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4403,6 +4403,17 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "syn" version = "2.0.117" @@ -4431,7 +4442,7 @@ checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4536,7 +4547,7 @@ dependencies = [ "either", "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4547,7 +4558,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4558,7 +4569,7 @@ checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4750,7 +4761,7 @@ checksum = "5c55a2eff8b69ce66c84f85e1da1c233edc36ceb85a2058d11b0d6a3c7e7569c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4879,7 +4890,7 @@ dependencies = [ "prettyplease", "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4904,7 +4915,7 @@ dependencies = [ "prost-build", "prost-types", "quote", - "syn", + "syn 2.0.117", "tempfile", "tonic-build", ] @@ -4998,7 +5009,7 @@ checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -5052,6 +5063,17 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "trait-set" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b79e2e9c9ab44c6d7c20d5976961b47e8f49ac199154daa514b77cd1ab536625" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "transpose" version = "0.2.3" @@ -5364,7 +5386,7 @@ dependencies = [ "proc-macro-error2", "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -5414,6 +5436,7 @@ dependencies = [ "tool-parser", "tracing", "tracing-subscriber", + "trait-set", "uuid", "vllm-engine-core-client", "vllm-llm", @@ -5460,6 +5483,7 @@ dependencies = [ "expect-test", "futures", "hex", + "itertools 0.14.0", "parking_lot", "rmp-serde", "rmpv", @@ -5526,6 +5550,7 @@ dependencies = [ "expect-test", "futures", "http-body", + "itertools 0.14.0", "libc", "prost", "prost-types", @@ -5572,6 +5597,7 @@ dependencies = [ "fastokens", "futures", "hf-hub 0.5.0", + "itertools 0.14.0", "rustc-hash 1.1.0", "serde", "serde_json", @@ -5584,6 +5610,7 @@ dependencies = [ "tokenizers", "tokio", "tracing", + "trait-set", "vllm-engine-core-client", "vllm-llm", ] @@ -5677,7 +5704,7 @@ dependencies = [ "bumpalo", "proc-macro2", "quote", - "syn", + "syn 2.0.117", "wasm-bindgen-shared", ] @@ -5842,7 +5869,7 @@ checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -5853,7 +5880,7 @@ checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -6095,7 +6122,7 @@ dependencies = [ "heck", "indexmap 2.13.0", "prettyplease", - "syn", + "syn 2.0.117", "wasm-metadata", "wit-bindgen-core", "wit-component", @@ -6111,7 +6138,7 @@ dependencies = [ "prettyplease", "proc-macro2", "quote", - "syn", + "syn 2.0.117", "wit-bindgen-core", "wit-bindgen-rust", ] @@ -6190,7 +6217,7 @@ checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", "synstructure", ] @@ -6211,7 +6238,7 @@ checksum = "7e6cc098ea4d3bd6246687de65af3f920c430e236bee1e3bf2e441463f08a02f" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -6231,7 +6258,7 @@ checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", "synstructure", ] @@ -6294,7 +6321,7 @@ checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index da999b67..5faf22e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -89,6 +89,7 @@ tower-http = { version = "0.6.8", features = ["trace"] } tracing = { version = "0.1.44", features = ["release_max_level_debug"] } tracing-futures = { version = "0.2.5", features = ["futures-03"] } tracing-subscriber = { version = "0.3.20", features = ["env-filter", "fmt"] } +trait-set = "0.3.0" uuid = { version = "1.22.0", features = ["v4"] } validator = { version = "0.20.0", features = ["derive"] } vllm-chat = { path = "src/chat" } diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 73b7b963..4933b3ba 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,2 +1,2 @@ [toolchain] -channel = "nightly-2026-03-10" +channel = "1.95" diff --git a/rustfmt.toml b/rustfmt.toml index 2cfdc28e..35011368 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,11 +1 @@ -unstable_features = true style_edition = "2024" -comment_width = 100 -format_code_in_doc_comments = true -format_macro_matchers = true -normalize_comments = true -normalize_doc_attributes = true -imports_granularity = "Module" -group_imports = "StdExternalCrate" -reorder_impl_items = true -wrap_comments = true diff --git a/rustfmt.unstable.toml b/rustfmt.unstable.toml new file mode 100644 index 00000000..5fa0fe10 --- /dev/null +++ b/rustfmt.unstable.toml @@ -0,0 +1,15 @@ +# Optional local formatting profile. CI and pre-commit use rustfmt.toml. +# Apply manually with: +# cargo +nightly fmt -- --config-path rustfmt.unstable.toml + +unstable_features = true +style_edition = "2024" +comment_width = 100 +format_code_in_doc_comments = true +format_macro_matchers = true +normalize_comments = true +normalize_doc_attributes = true +imports_granularity = "Module" +group_imports = "StdExternalCrate" +reorder_impl_items = true +wrap_comments = true diff --git a/src/chat/Cargo.toml b/src/chat/Cargo.toml index 601a6fd7..57ae106a 100644 --- a/src/chat/Cargo.toml +++ b/src/chat/Cargo.toml @@ -24,6 +24,7 @@ thiserror.workspace = true thiserror-ext.workspace = true tool-parser.workspace = true tracing.workspace = true +trait-set.workspace = true uuid.workspace = true vllm-engine-core-client.workspace = true vllm-llm.workspace = true diff --git a/src/chat/src/lib.rs b/src/chat/src/lib.rs index 6486a95b..00ab7a48 100644 --- a/src/chat/src/lib.rs +++ b/src/chat/src/lib.rs @@ -1,5 +1,3 @@ -#![feature(trait_alias)] - //! Minimal chat facade above [`vllm_text`]. //! //! This crate keeps the northbound boundary intentionally small: diff --git a/src/chat/src/output/default/mod.rs b/src/chat/src/output/default/mod.rs index bdcee06b..2f716499 100644 --- a/src/chat/src/output/default/mod.rs +++ b/src/chat/src/output/default/mod.rs @@ -7,6 +7,7 @@ use std::sync::Once; use futures::{Stream, StreamExt as _}; use tracing::info; +use trait_set::trait_set; use vllm_text::tokenizer::DynTokenizer; use self::reasoning::reasoning_event_stream; @@ -23,7 +24,9 @@ use crate::parser::tool::{ToolParser, ToolParserFactory}; use crate::request::{ChatRequest, ChatToolChoice}; use crate::{Error, Result as ChatResult}; -trait ContentEventStream = Stream> + Send + 'static; +trait_set! { + trait ContentEventStream = Stream> + Send + 'static; +} /// Default request-scoped output processor used by Hugging Face style chat backends. /// diff --git a/src/chat/src/output/mod.rs b/src/chat/src/output/mod.rs index fade1480..50a53881 100644 --- a/src/chat/src/output/mod.rs +++ b/src/chat/src/output/mod.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use futures::Stream; use subenum::subenum; +use trait_set::trait_set; use uuid::Uuid; use vllm_text::output::{DecodedLogprobs, DecodedPromptLogprobs, DecodedTextEvent}; @@ -114,12 +115,14 @@ pub trait ChatOutputProcessor: Send { /// Trait-object form of [`ChatOutputProcessor`]. pub type DynChatOutputProcessor = Box; -/// Boxed-stream constraint for decoded text updates. -pub(crate) trait DecodedTextEventStream = Stream> + Send + 'static; -/// Boxed-stream constraint for internal assistant events. -pub(crate) trait AssistantEventStream = Stream> + Send + 'static; -/// Boxed-stream constraint for public chat events. -pub(crate) trait ChatEventStream = Stream> + Send + 'static; +trait_set! { + /// Boxed-stream constraint for decoded text updates. + pub(crate) trait DecodedTextEventStream = Stream> + Send + 'static; + /// Boxed-stream constraint for internal assistant events. + pub(crate) trait AssistantEventStream = Stream> + Send + 'static; + /// Boxed-stream constraint for public chat events. + pub(crate) trait ChatEventStream = Stream> + Send + 'static; +} /// Generate the northbound tool-call ID using the OpenAI-style `call_` format. // TODO: support other ID scheme like Kimi-K2's `functions.{name}:{global_index}`. diff --git a/src/chat/src/stream.rs b/src/chat/src/stream.rs index c2a25719..a9ef663e 100644 --- a/src/chat/src/stream.rs +++ b/src/chat/src/stream.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use futures::Stream; +use trait_set::trait_set; use vllm_text::{DecodedLogprobs, DecodedPositionLogprobs, DecodedPromptLogprobs}; use crate::FinishReason; @@ -118,7 +119,9 @@ impl Stream for ChatEventStream { } } -pub trait ChatEventStreamTrait = Stream> + Send + 'static; +trait_set! { + pub trait ChatEventStreamTrait = Stream> + Send + 'static; +} #[cfg(test)] mod tests { diff --git a/src/engine-core-client/Cargo.toml b/src/engine-core-client/Cargo.toml index c7136b62..89c792fd 100644 --- a/src/engine-core-client/Cargo.toml +++ b/src/engine-core-client/Cargo.toml @@ -13,6 +13,7 @@ bytes.workspace = true enum-as-inner.workspace = true futures.workspace = true hex.workspace = true +itertools.workspace = true parking_lot.workspace = true rmp-serde.workspace = true rmpv.workspace = true diff --git a/src/engine-core-client/src/client.rs b/src/engine-core-client/src/client.rs index abb10ed0..1a05f5f9 100644 --- a/src/engine-core-client/src/client.rs +++ b/src/engine-core-client/src/client.rs @@ -434,7 +434,7 @@ impl EngineCoreClient { .inner .register_request(request_id.clone(), data_parallel_rank)?; - let result = try { + let result: Result<()> = async { if let Some(coordinator) = self.coordinator.as_ref() { let snapshot = coordinator.snapshot(); req.current_wave = snapshot.current_wave; @@ -452,7 +452,9 @@ impl EngineCoreClient { self.inner .send_to_engine(&engine_id, EngineCoreRequestType::Add, &req) .await?; - }; + Ok(()) + } + .await; // Failed to send the request to the engine, roll back the registration. if let Err(error) = result { diff --git a/src/engine-core-client/src/client/imp.rs b/src/engine-core-client/src/client/imp.rs index f12fba92..d92a5303 100644 --- a/src/engine-core-client/src/client/imp.rs +++ b/src/engine-core-client/src/client/imp.rs @@ -272,7 +272,7 @@ pub(crate) async fn run_output_dispatcher_loop( inner: Arc, mut output_rx: mpsc::Receiver>, ) { - let Err(error) = try { + let result: Result<()> = async { loop { let outputs = match output_rx.recv().await { Some(outputs) => outputs, @@ -350,7 +350,9 @@ pub(crate) async fn run_output_dispatcher_loop( } } } - }; + } + .await; + let Err(error) = result else { return }; warn!(error = %error.as_report(), "output dispatcher exiting with error"); inner.close_registries(Arc::new(error)); diff --git a/src/engine-core-client/src/coordinator/external.rs b/src/engine-core-client/src/coordinator/external.rs index 9004e886..ed5934c9 100644 --- a/src/engine-core-client/src/coordinator/external.rs +++ b/src/engine-core-client/src/coordinator/external.rs @@ -133,14 +133,14 @@ impl ExternalCoordinatorService { /// Drive the coordinator event loop until either side of the control plane /// is closed or a fatal error is observed. pub(crate) async fn run(mut self, inner: Arc) { - let Err(error) = try { + let result: Result<()> = async { loop { tokio::select! { // Received frontend-originated command from the handle. command = self.command_rx.recv() => { let Some(command) = command else { warn!("external coordinator command channel closed, shutting down service"); - return; + return Ok(()); }; self.handle_command(command).await?; } @@ -151,7 +151,9 @@ impl ExternalCoordinatorService { } } } - }; + } + .await; + let Err(error) = result else { return }; warn!( error = %error.as_report(), diff --git a/src/engine-core-client/src/coordinator/inproc.rs b/src/engine-core-client/src/coordinator/inproc.rs index 577def80..18607740 100644 --- a/src/engine-core-client/src/coordinator/inproc.rs +++ b/src/engine-core-client/src/coordinator/inproc.rs @@ -170,14 +170,14 @@ impl InProcCoordinatorRunner { mut output_rx: mpsc::Receiver>, inner: Arc, ) { - let Err(error) = try { + let result: Result<()> = async { loop { tokio::select! { // Received frontend-originated command from the handle. command = self.command_rx.recv() => { let Some(command) = command else { warn!("coordinator command channel closed, shutting down coordinator runner"); - return; + return Ok(()); }; self.handle_command(command).await?; } @@ -185,13 +185,15 @@ impl InProcCoordinatorRunner { outputs = output_rx.recv() => { let Some(outputs) = outputs else { warn!("coordinator output channel closed, shutting down coordinator runner"); - return; + return Ok(()); }; self.handle_outputs(outputs?).await?; } } } - }; + } + .await; + let Err(error) = result else { return }; warn!(error = %error.as_report(), "coordinator runner exiting with error"); inner.close_registries(Arc::new(error)); diff --git a/src/engine-core-client/src/lib.rs b/src/engine-core-client/src/lib.rs index 20b2568e..914ce874 100644 --- a/src/engine-core-client/src/lib.rs +++ b/src/engine-core-client/src/lib.rs @@ -1,6 +1,3 @@ -#![feature(iterator_try_collect)] -#![feature(try_blocks)] - mod client; mod coordinator; mod error; diff --git a/src/engine-core-client/src/protocol/logprobs/array.rs b/src/engine-core-client/src/protocol/logprobs/array.rs index 65fb0add..79f3a922 100644 --- a/src/engine-core-client/src/protocol/logprobs/array.rs +++ b/src/engine-core-client/src/protocol/logprobs/array.rs @@ -1,6 +1,7 @@ use std::io::Cursor; use byteorder::{BigEndian, LittleEndian, NativeEndian, ReadBytesExt}; +use itertools::Itertools as _; use crate::error::{Error, Result, ext_value_decode}; use crate::protocol::logprobs::wire::{WireArrayData, WireNdArray}; diff --git a/src/server/Cargo.toml b/src/server/Cargo.toml index 7269dde1..1e4f3e10 100644 --- a/src/server/Cargo.toml +++ b/src/server/Cargo.toml @@ -9,6 +9,7 @@ asynk-strim-attr.workspace = true axum.workspace = true futures.workspace = true http-body.workspace = true +itertools.workspace = true libc.workspace = true prost.workspace = true prost-types.workspace = true diff --git a/src/server/src/lib.rs b/src/server/src/lib.rs index 3a9d26d6..0eb555ed 100644 --- a/src/server/src/lib.rs +++ b/src/server/src/lib.rs @@ -1,5 +1,3 @@ -#![feature(iterator_try_collect)] - //! Minimal OpenAI-compatible HTTP server above [`vllm_chat`]. mod config; diff --git a/src/server/src/routes/openai/chat_completions/convert.rs b/src/server/src/routes/openai/chat_completions/convert.rs index d18a72eb..4f2ee1c8 100644 --- a/src/server/src/routes/openai/chat_completions/convert.rs +++ b/src/server/src/routes/openai/chat_completions/convert.rs @@ -1,3 +1,4 @@ +use itertools::Itertools as _; use vllm_chat::{ AssistantContentBlock, AssistantToolCall, ChatContent, ChatContentPart, ChatMessage as VllmChatMessage, ChatOptions, ChatRequest, ChatTool, ChatToolChoice, @@ -49,11 +50,11 @@ pub(crate) fn prepare_chat_request( .echo .then(|| extract_last_assistant_content(&request.messages)) .flatten(); - let messages = request + let messages: Vec<_> = request .messages .into_iter() .map(convert_message) - .try_collect::>()?; + .try_collect()?; let generation_prompt_mode = normalize_generation_prompt_mode( request.add_generation_prompt, request.continue_final_message, diff --git a/src/server/src/routes/openai/completions.rs b/src/server/src/routes/openai/completions.rs index 3a3709a5..180e142f 100644 --- a/src/server/src/routes/openai/completions.rs +++ b/src/server/src/routes/openai/completions.rs @@ -431,6 +431,7 @@ fn done_sse_event() -> Event { #[cfg(test)] mod tests { use futures::{StreamExt as _, stream}; + use itertools::Itertools as _; use vllm_text::{ DecodedLogprobs, DecodedPositionLogprobs, DecodedTextEvent, DecodedTokenLogprob, FinishReason, Finished, @@ -540,9 +541,9 @@ mod tests { .collect::>() .await; - let chunks = chunks + let chunks: Vec<_> = chunks .into_iter() - .try_collect::>() + .try_collect() .expect("stream should succeed"); match &chunks[0] { diff --git a/src/server/src/routes/openai/utils/logprobs.rs b/src/server/src/routes/openai/utils/logprobs.rs index 0826af11..bf54be8a 100644 --- a/src/server/src/routes/openai/utils/logprobs.rs +++ b/src/server/src/routes/openai/utils/logprobs.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; +use itertools::Itertools as _; use vllm_text::{ CollectedTextOutput, DecodedLogprobs, DecodedPositionLogprobs, DecodedPromptLogprobs, DecodedTokenLogprob, diff --git a/src/text/Cargo.toml b/src/text/Cargo.toml index b5704cac..74231a5e 100644 --- a/src/text/Cargo.toml +++ b/src/text/Cargo.toml @@ -12,6 +12,7 @@ enum-as-inner.workspace = true fastokens.workspace = true futures.workspace = true hf-hub.workspace = true +itertools.workspace = true rustc-hash.workspace = true serde.workspace = true serde_json.workspace = true @@ -22,6 +23,7 @@ thiserror-ext.workspace = true tiktoken-rs.workspace = true tokenizers.workspace = true tracing.workspace = true +trait-set.workspace = true vllm-engine-core-client.workspace = true vllm-llm.workspace = true diff --git a/src/text/src/lib.rs b/src/text/src/lib.rs index 230e3005..66d69201 100644 --- a/src/text/src/lib.rs +++ b/src/text/src/lib.rs @@ -1,6 +1,3 @@ -#![feature(trait_alias)] -#![feature(iterator_try_collect)] - //! Shared text-generation support used by chat and future raw completions. //! //! This crate intentionally stays below chat semantics: @@ -20,6 +17,7 @@ pub use output::{ DecodedTextEvent, DecodedTokenLogprob, Finished, TextDecodeOptions, TextOutputStreamExt, }; pub use request::{Prompt, SamplingParams, TextRequest}; +use trait_set::trait_set; use vllm_engine_core_client::EngineCoreClient; pub use vllm_llm::FinishReason; use vllm_llm::{GenerateOutputStream, Llm}; @@ -34,8 +32,10 @@ pub mod output; mod request; pub mod tokenizer; -/// Shared streamed text output type used by raw completions and other text-only northbound paths. -pub trait TextOutputStream = Stream> + Send + 'static; +trait_set! { + /// Shared streamed text output type used by raw completions and other text-only northbound paths. + pub trait TextOutputStream = Stream> + Send + 'static; +} /// Raw text facade above [`Llm`]. /// diff --git a/src/text/src/output/logprobs.rs b/src/text/src/output/logprobs.rs index f183dd91..d733ff1c 100644 --- a/src/text/src/output/logprobs.rs +++ b/src/text/src/output/logprobs.rs @@ -1,3 +1,4 @@ +use itertools::Itertools as _; use serde::{Deserialize, Serialize}; use vllm_llm::{Logprobs, PositionLogprobs};