Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 88 additions & 61 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ tower-http = { version = "0.6.8", features = ["trace"] }
tracing = { version = "0.1.44", features = ["release_max_level_debug"] }
tracing-futures = { version = "0.2.5", features = ["futures-03"] }
tracing-subscriber = { version = "0.3.20", features = ["env-filter", "fmt"] }
trait-set = "0.3.0"
uuid = { version = "1.22.0", features = ["v4"] }
validator = { version = "0.20.0", features = ["derive"] }
vllm-chat = { path = "src/chat" }
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[toolchain]
channel = "nightly-2026-03-10"
channel = "1.95"
Comment thread
BugenZhao marked this conversation as resolved.
10 changes: 0 additions & 10 deletions rustfmt.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1 @@
unstable_features = true
style_edition = "2024"
comment_width = 100
format_code_in_doc_comments = true
format_macro_matchers = true
normalize_comments = true
normalize_doc_attributes = true
imports_granularity = "Module"
group_imports = "StdExternalCrate"
reorder_impl_items = true
wrap_comments = true
15 changes: 15 additions & 0 deletions rustfmt.unstable.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Optional local formatting profile. CI and pre-commit use rustfmt.toml.
# Apply manually with:
# cargo +nightly fmt -- --config-path rustfmt.unstable.toml

unstable_features = true
style_edition = "2024"
comment_width = 100
format_code_in_doc_comments = true
format_macro_matchers = true
normalize_comments = true
normalize_doc_attributes = true
imports_granularity = "Module"
group_imports = "StdExternalCrate"
reorder_impl_items = true
wrap_comments = true
1 change: 1 addition & 0 deletions src/chat/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ thiserror.workspace = true
thiserror-ext.workspace = true
tool-parser.workspace = true
tracing.workspace = true
trait-set.workspace = true
uuid.workspace = true
vllm-engine-core-client.workspace = true
vllm-llm.workspace = true
Expand Down
2 changes: 0 additions & 2 deletions src/chat/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#![feature(trait_alias)]

//! Minimal chat facade above [`vllm_text`].
//!
//! This crate keeps the northbound boundary intentionally small:
Expand Down
5 changes: 4 additions & 1 deletion src/chat/src/output/default/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::sync::Once;

use futures::{Stream, StreamExt as _};
use tracing::info;
use trait_set::trait_set;
use vllm_text::tokenizer::DynTokenizer;

use self::reasoning::reasoning_event_stream;
Expand All @@ -23,7 +24,9 @@ use crate::parser::tool::{ToolParser, ToolParserFactory};
use crate::request::{ChatRequest, ChatToolChoice};
use crate::{Error, Result as ChatResult};

trait ContentEventStream = Stream<Item = Result<ContentEvent>> + Send + 'static;
trait_set! {
trait ContentEventStream = Stream<Item = Result<ContentEvent>> + Send + 'static;
}

/// Default request-scoped output processor used by Hugging Face style chat backends.
///
Expand Down
15 changes: 9 additions & 6 deletions src/chat/src/output/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;

use futures::Stream;
use subenum::subenum;
use trait_set::trait_set;
use uuid::Uuid;
use vllm_text::output::{DecodedLogprobs, DecodedPromptLogprobs, DecodedTextEvent};

Expand Down Expand Up @@ -114,12 +115,14 @@ pub trait ChatOutputProcessor: Send {
/// Trait-object form of [`ChatOutputProcessor`].
pub type DynChatOutputProcessor = Box<dyn ChatOutputProcessor>;

/// Boxed-stream constraint for decoded text updates.
pub(crate) trait DecodedTextEventStream = Stream<Item = Result<DecodedTextEvent>> + Send + 'static;
/// Boxed-stream constraint for internal assistant events.
pub(crate) trait AssistantEventStream = Stream<Item = Result<AssistantEvent>> + Send + 'static;
/// Boxed-stream constraint for public chat events.
pub(crate) trait ChatEventStream = Stream<Item = Result<ChatEvent>> + Send + 'static;
trait_set! {
/// Boxed-stream constraint for decoded text updates.
pub(crate) trait DecodedTextEventStream = Stream<Item = Result<DecodedTextEvent>> + Send + 'static;
/// Boxed-stream constraint for internal assistant events.
pub(crate) trait AssistantEventStream = Stream<Item = Result<AssistantEvent>> + Send + 'static;
/// Boxed-stream constraint for public chat events.
pub(crate) trait ChatEventStream = Stream<Item = Result<ChatEvent>> + Send + 'static;
}
Comment thread
BugenZhao marked this conversation as resolved.

/// Generate the northbound tool-call ID using the OpenAI-style `call_<id>` format.
// TODO: support other ID scheme like Kimi-K2's `functions.{name}:{global_index}`.
Expand Down
5 changes: 4 additions & 1 deletion src/chat/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};

use futures::Stream;
use trait_set::trait_set;
use vllm_text::{DecodedLogprobs, DecodedPositionLogprobs, DecodedPromptLogprobs};

use crate::FinishReason;
Expand Down Expand Up @@ -118,7 +119,9 @@ impl Stream for ChatEventStream {
}
}

pub trait ChatEventStreamTrait = Stream<Item = Result<ChatEvent>> + Send + 'static;
trait_set! {
pub trait ChatEventStreamTrait = Stream<Item = Result<ChatEvent>> + Send + 'static;
}

#[cfg(test)]
mod tests {
Expand Down
1 change: 1 addition & 0 deletions src/engine-core-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ bytes.workspace = true
enum-as-inner.workspace = true
futures.workspace = true
hex.workspace = true
itertools.workspace = true
parking_lot.workspace = true
rmp-serde.workspace = true
rmpv.workspace = true
Expand Down
6 changes: 4 additions & 2 deletions src/engine-core-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ impl EngineCoreClient {
.inner
.register_request(request_id.clone(), data_parallel_rank)?;

let result = try {
let result: Result<()> = async {
if let Some(coordinator) = self.coordinator.as_ref() {
let snapshot = coordinator.snapshot();
req.current_wave = snapshot.current_wave;
Expand All @@ -452,7 +452,9 @@ impl EngineCoreClient {
self.inner
.send_to_engine(&engine_id, EngineCoreRequestType::Add, &req)
.await?;
};
Ok(())
}
.await;

// Failed to send the request to the engine, roll back the registration.
if let Err(error) = result {
Expand Down
6 changes: 4 additions & 2 deletions src/engine-core-client/src/client/imp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ pub(crate) async fn run_output_dispatcher_loop(
inner: Arc<ClientInner>,
mut output_rx: mpsc::Receiver<Result<EngineCoreOutputs>>,
) {
let Err(error) = try {
let result: Result<()> = async {
loop {
let outputs = match output_rx.recv().await {
Some(outputs) => outputs,
Expand Down Expand Up @@ -350,7 +350,9 @@ pub(crate) async fn run_output_dispatcher_loop(
}
}
}
};
}
.await;
let Err(error) = result else { return };

warn!(error = %error.as_report(), "output dispatcher exiting with error");
inner.close_registries(Arc::new(error));
Expand Down
8 changes: 5 additions & 3 deletions src/engine-core-client/src/coordinator/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,14 @@ impl ExternalCoordinatorService {
/// Drive the coordinator event loop until either side of the control plane
/// is closed or a fatal error is observed.
pub(crate) async fn run(mut self, inner: Arc<ClientInner>) {
let Err(error) = try {
let result: Result<()> = async {
loop {
tokio::select! {
// Received frontend-originated command from the handle.
command = self.command_rx.recv() => {
let Some(command) = command else {
warn!("external coordinator command channel closed, shutting down service");
return;
return Ok(());
};
self.handle_command(command).await?;
}
Expand All @@ -151,7 +151,9 @@ impl ExternalCoordinatorService {
}
}
}
};
}
.await;
let Err(error) = result else { return };

warn!(
error = %error.as_report(),
Expand Down
10 changes: 6 additions & 4 deletions src/engine-core-client/src/coordinator/inproc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,28 +170,30 @@ impl InProcCoordinatorRunner {
mut output_rx: mpsc::Receiver<Result<EngineCoreOutputs>>,
inner: Arc<ClientInner>,
) {
let Err(error) = try {
let result: Result<()> = async {
loop {
tokio::select! {
// Received frontend-originated command from the handle.
command = self.command_rx.recv() => {
let Some(command) = command else {
warn!("coordinator command channel closed, shutting down coordinator runner");
return;
return Ok(());
};
self.handle_command(command).await?;
}
// Received engine-originated control output from the coordinator socket.
outputs = output_rx.recv() => {
let Some(outputs) = outputs else {
warn!("coordinator output channel closed, shutting down coordinator runner");
return;
return Ok(());
};
self.handle_outputs(outputs?).await?;
}
}
}
};
}
.await;
let Err(error) = result else { return };

warn!(error = %error.as_report(), "coordinator runner exiting with error");
inner.close_registries(Arc::new(error));
Expand Down
3 changes: 0 additions & 3 deletions src/engine-core-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
#![feature(iterator_try_collect)]
#![feature(try_blocks)]

mod client;
mod coordinator;
mod error;
Expand Down
1 change: 1 addition & 0 deletions src/engine-core-client/src/protocol/logprobs/array.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::io::Cursor;

use byteorder::{BigEndian, LittleEndian, NativeEndian, ReadBytesExt};
use itertools::Itertools as _;

use crate::error::{Error, Result, ext_value_decode};
use crate::protocol::logprobs::wire::{WireArrayData, WireNdArray};
Expand Down
1 change: 1 addition & 0 deletions src/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ asynk-strim-attr.workspace = true
axum.workspace = true
futures.workspace = true
http-body.workspace = true
itertools.workspace = true
libc.workspace = true
prost.workspace = true
prost-types.workspace = true
Expand Down
2 changes: 0 additions & 2 deletions src/server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#![feature(iterator_try_collect)]

//! Minimal OpenAI-compatible HTTP server above [`vllm_chat`].

mod config;
Expand Down
5 changes: 3 additions & 2 deletions src/server/src/routes/openai/chat_completions/convert.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use itertools::Itertools as _;
use vllm_chat::{
AssistantContentBlock, AssistantToolCall, ChatContent, ChatContentPart,
ChatMessage as VllmChatMessage, ChatOptions, ChatRequest, ChatTool, ChatToolChoice,
Expand Down Expand Up @@ -49,11 +50,11 @@ pub(crate) fn prepare_chat_request(
.echo
.then(|| extract_last_assistant_content(&request.messages))
.flatten();
let messages = request
let messages: Vec<_> = request
.messages
.into_iter()
.map(convert_message)
.try_collect::<Vec<_>>()?;
.try_collect()?;
let generation_prompt_mode = normalize_generation_prompt_mode(
request.add_generation_prompt,
request.continue_final_message,
Expand Down
5 changes: 3 additions & 2 deletions src/server/src/routes/openai/completions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ fn done_sse_event() -> Event {
#[cfg(test)]
mod tests {
use futures::{StreamExt as _, stream};
use itertools::Itertools as _;
use vllm_text::{
DecodedLogprobs, DecodedPositionLogprobs, DecodedTextEvent, DecodedTokenLogprob,
FinishReason, Finished,
Expand Down Expand Up @@ -540,9 +541,9 @@ mod tests {
.collect::<Vec<_>>()
.await;

let chunks = chunks
let chunks: Vec<_> = chunks
.into_iter()
.try_collect::<Vec<_>>()
.try_collect()
.expect("stream should succeed");

match &chunks[0] {
Expand Down
1 change: 1 addition & 0 deletions src/server/src/routes/openai/utils/logprobs.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;

use itertools::Itertools as _;
use vllm_text::{
CollectedTextOutput, DecodedLogprobs, DecodedPositionLogprobs, DecodedPromptLogprobs,
DecodedTokenLogprob,
Expand Down
2 changes: 2 additions & 0 deletions src/text/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ enum-as-inner.workspace = true
fastokens.workspace = true
futures.workspace = true
hf-hub.workspace = true
itertools.workspace = true
rustc-hash.workspace = true
serde.workspace = true
serde_json.workspace = true
Expand All @@ -22,6 +23,7 @@ thiserror-ext.workspace = true
tiktoken-rs.workspace = true
tokenizers.workspace = true
tracing.workspace = true
trait-set.workspace = true
vllm-engine-core-client.workspace = true
vllm-llm.workspace = true

Expand Down
10 changes: 5 additions & 5 deletions src/text/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
#![feature(trait_alias)]
#![feature(iterator_try_collect)]

//! Shared text-generation support used by chat and future raw completions.
//!
//! This crate intentionally stays below chat semantics:
Expand All @@ -20,6 +17,7 @@ pub use output::{
DecodedTextEvent, DecodedTokenLogprob, Finished, TextDecodeOptions, TextOutputStreamExt,
};
pub use request::{Prompt, SamplingParams, TextRequest};
use trait_set::trait_set;
use vllm_engine_core_client::EngineCoreClient;
pub use vllm_llm::FinishReason;
use vllm_llm::{GenerateOutputStream, Llm};
Expand All @@ -34,8 +32,10 @@ pub mod output;
mod request;
pub mod tokenizer;

/// Shared streamed text output type used by raw completions and other text-only northbound paths.
pub trait TextOutputStream = Stream<Item = Result<DecodedTextEvent>> + Send + 'static;
trait_set! {
/// Shared streamed text output type used by raw completions and other text-only northbound paths.
pub trait TextOutputStream = Stream<Item = Result<DecodedTextEvent>> + Send + 'static;
}

/// Raw text facade above [`Llm`].
///
Expand Down
1 change: 1 addition & 0 deletions src/text/src/output/logprobs.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use itertools::Itertools as _;
use serde::{Deserialize, Serialize};
use vllm_llm::{Logprobs, PositionLogprobs};

Expand Down
Loading