Skip to content

feat(bindings/python): smg-as-tokenspeed-dependency surface (working serve_oai)#1406

Draft
yetone wants to merge 8 commits into
mainfrom
feat/pylib-protocol
Draft

feat(bindings/python): smg-as-tokenspeed-dependency surface (working serve_oai)#1406
yetone wants to merge 8 commits into
mainfrom
feat/pylib-protocol

Conversation

@yetone
Copy link
Copy Markdown
Collaborator

@yetone yetone commented Apr 28, 2026

Draft. Lays the integration surface for the new direction agreed
on with @zhyncs in slack on
2026-04-27/28: instead of tokenspeed serve running smg's Rust
gateway as a sidecar/subprocess that talks gRPC (the route
#1351 was building, now Draft / [paused]), tokenspeed will
import smg_rs and call into smg as a Python dependency for its
protocol layer — tokenization, function-call parser, reasoning
parser, OAI server, MCP, response API. Single Python process, no
gRPC, no IPC.

What's in this PR

  • bindings/python/src/serving.rs — three pyfunctions registered
    under the existing smg_rs extension:

    function status
    parse_tool_call_complete(text, parser_name) ✅ real, calls tool_parser (18 detectors)
    parse_reasoning_complete(text, parser_name) ✅ real, calls reasoning_parser (13 detectors)
    serve_oai(engine, host, port) real now, axum HTTP server in-process
  • bindings/python/Cargo.toml — adds pyo3-async-runtimes,
    axum, serde_json, uuid, tracing, futures deps.

  • bindings/python/tests/test_serving_skeleton.py — pytest
    smoke tests for all three entries.

  • bindings/python/README.md — "two consumption modes" section
    (smg-as-binary vs smg-as-library).

How serve_oai is wired

                  Python process boundary
                          │
ts serve  ─────► smg_rs.serve_oai(async_llm, host, port)
                          │
                          ▼
        pyo3_async_runtimes::tokio::run        ← supervisor thread
        ├─ asyncio.new_event_loop()  (sets up TaskLocals)
        └─ tokio::Runtime
                          │
                          ▼
        async fn { capture locals; build axum router; bind+serve }
                          │
              ┌───────────┴───────────┐
              ▼                       ▼
         POST /v1/chat/completions  (other handlers)
              │
              │ Each handler: tokio::scope(locals.clone(), async {
              │   Python::attach(|py| {
              │     build GenerateReqInput,
              │     call engine.generate_request(),
              │     pyo3_async_runtimes::tokio::into_future(coro)
              │   })?.await
              │ })
              ▼
       OAI ChatCompletion JSON response

Cross-language data marshalling uses a single json.dumps/
json.loads hop in each direction so we don't fight pyo3 0.28's
IntoPyObject trait surface for nested types.

Verified end-to-end on n1

ts-claude-test container, real maturin build of the wheel,
FakeAsyncLLM mock implementing async def generate_request(obj):

=== starting smg_rs.serve_oai on 127.0.0.1:18900 ===
  server is reachable

=== POST /v1/chat/completions ===
{
  "id": "chatcmpl-07dacba9b26d4af2ab0d8eb38016954e",
  "object": "chat.completion",
  "model": "fake-model",
  "choices": [
    {
      "index": 0,
      "message": {"role": "assistant", "content": "echo:hello, smg!"},
      "finish_reason": "stop"
    }
  ],
  "usage": {"prompt_tokens": 5, "completion_tokens": 3, "total_tokens": 8}
}

ALL ASSERTIONS PASSED — axum + pyo3-async-runtimes bridge works

The same shape will land verbatim against a real tokenspeed.runtime.engine.async_llm.AsyncLLMgenerate_request is the only method we touch.

Iteration history (commits on this branch)

  1. 11d05d67 initial skeleton (parsers real, serve_oai stub)
  2. 1345449d fix factory.registry().create_parser + Py<PyAny> alias
  3. 467e069a real serve_oai (failed at runtime)
  4. bcb22c35 wire bridge through tokio::run + tokio::scope + get_current_locals (works)
  5. * test refactor (drop stub-error assertion)

Follow-ups out of scope here

  1. Streaming SSE — wraps __anext__ per chunk through pyo3-async-runtimes; needed for stream: true in /v1/chat/completions.
  2. Chat-template render in crates/tokenizer (minijinja) so smg, not tokenspeed, owns the prompt-formatting code path.
  3. Tool / reasoning post-processing in the streaming path so tool-call JSON gets sliced into proper tool_calls chunks.
  4. Other endpoints (/v1/completions, /v1/responses, /v1/embeddings, MCP, response API) per Yineng's "tokenization, function calling, reasoning parser, oai server, mcp, response api" list.

Test plan

  • cargo build (verified via maturin build --release on n1, ~4m30s)
  • Wheel installs cleanly via pip install
  • from smg import smg_rs; smg_rs.parse_tool_call_complete(...) works
  • from smg import smg_rs; smg_rs.parse_reasoning_complete(...) works
  • smg_rs.serve_oai(FakeAsyncLLM, host, port) blocks; POST /v1/chat/completions returns valid OAI ChatCompletion against the fake engine
  • Coexists with tokenspeed in the same process (DEMO 4 of tools/smg_pylib_demo.py)
  • Streaming end-to-end against a real AsyncLLM (follow-up)
  • CI green on bindings/python/tests/test_serving_skeleton.py

yetone added 2 commits April 28, 2026 17:51
Direction agreed with @syuoni on 2026-04-27/28: instead of
`tokenspeed serve` running smg's Rust gateway as a sidecar/subprocess
(the route #1351 was building, now Draft / [paused]),
tokenspeed will import smg's PyO3 module and call into it as a Python
dependency for the protocol layer — tokenization, function-call
parser, reasoning parser, OAI-compatible HTTP server, MCP, response
API. Single Python process, no gRPC, no IPC.

This commit lays the integration surface in the existing
`bindings/python` crate (rather than a new crate, so consumers stay
on a single `import smg_rs`):

* `bindings/python/src/serving.rs` — three new pyfunctions:
  - `parse_tool_call_complete(text, parser_name) -> dict` —
    real implementation, calls the `tool_parser` workspace crate.
  - `parse_reasoning_complete(text, parser_name) -> dict` —
    real implementation, calls the `reasoning_parser` workspace
    crate.
  - `serve_oai(engine, host, port, chat_template=None)` — stub;
    the full version will spin up an axum HTTP server in-process,
    drive the supplied `AsyncLLM` via `pyo3-async-runtimes`, and
    stream parsed tool/reasoning chunks back as SSE. Until then,
    raises `RuntimeError` with a pointer to this comment.

* `bindings/python/src/lib.rs` — registers the new module under
  the existing `smg_rs` extension.

* `bindings/python/tests/test_serving_skeleton.py` — smoke tests
  for all three entries (JSON tool call, Qwen3 thinking block,
  unknown-parser errors, stub error message).

* `bindings/python/README.md` — adds a "two consumption modes"
  section explaining smg-as-binary vs smg-as-library.

Signed-off-by: yetone <yetoneful@gmail.com>
Two fixes against the initial skeleton, surfaced by building the wheel
on the n1 dev box:

1. ParserFactory does not expose create_parser directly — that method
   lives on ParserRegistry. ParserFactory wraps a ParserRegistry; go
   through factory.registry() to reach create_parser.

2. pyo3 0.28 does not re-export PyObject from the prelude. Add a local
   type alias to PyObject = Py<PyAny> instead of fishing imports out
   of pyo3 internals.

Verified end-to-end on n1 (ts-claude-test container, tokenspeed 0.1.0
+ smg-1.4.1 abi3 wheel), demo at /tmp/smg_pylib_demo.py:

  === DEMO 1: parse_tool_call_complete ===
  {'tool_calls': [{'name': 'get_weather', 'arguments': '...'}], 'normal_text': ''}
    ok: tool_calls extracted, name + args parsed
  === DEMO 2: parse_reasoning_complete (Qwen3 thinking) ===
  {'normal_text': 'The answer is 42.', 'reasoning_text': 'let me think step by step'}
    ok: thinking block split from final answer
  === DEMO 3: serve_oai stub ===
    ok: stub raises with pointer
  === DEMO 4: tokenspeed AND smg_rs in the same process ===
    tokenspeed: 0.1.0
    smg_rs tool parsers   : 18 detectors
    smg_rs reasoning parsers: 13 detectors
  ALL DEMOS PASSED

Signed-off-by: yetone <yetoneful@gmail.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 28, 2026

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 42fa3200-bd60-4e03-ad3f-9e7c25af088c

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/pylib-protocol

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions github-actions Bot added documentation Improvements or additions to documentation python-bindings Python bindings changes tests Test changes labels Apr 28, 2026
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new 'Python library' consumption mode for SMG, allowing external inference engines to use its protocol layer directly. It adds a new serving module with bindings for tool-call and reasoning parsing, along with a stub for an OAI-compatible HTTP server. Feedback was provided regarding a race condition in the shared Tokio runtime initialization, potential performance improvements by caching parser factories, and an inconsistency between the serve_oai function signature and its documented asynchronous usage in Python.

Comment on lines +39 to +49
fn shared_runtime() -> PyResult<&'static Runtime> {
static RT: OnceLock<Runtime> = OnceLock::new();
if let Some(rt) = RT.get() {
return Ok(rt);
}
let rt = Runtime::new()
.map_err(|e| PyRuntimeError::new_err(format!("failed to start tokio runtime: {e}")))?;
RT.set(rt)
.map_err(|_| PyRuntimeError::new_err("tokio runtime initialized twice"))?;
Ok(RT.get().expect("just set"))
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation of shared_runtime contains a race condition. If multiple threads call this function simultaneously during initialization, both may attempt to create a Runtime, but only one can successfully call RT.set(rt). The other thread will receive a PyRuntimeError ("tokio runtime initialized twice"). Since once_cell is already a dependency in this project (used in lib.rs), it is recommended to use once_cell::sync::OnceCell::get_or_try_init to ensure thread-safe, single-instance initialization while correctly propagating errors from Runtime::new().

fn shared_runtime() -> PyResult<&'static Runtime> {
    static RT: once_cell::sync::OnceCell<Runtime> = once_cell::sync::OnceCell::new();
    RT.get_or_try_init(|| {
        Runtime::new().map_err(|e| PyRuntimeError::new_err(format!("failed to start tokio runtime: {e}")))
    })
}

Comment on lines +75 to +79
let factory = tool_parser::ParserFactory::new();
let mut parser = factory
.registry()
.create_parser(parser_name)
.ok_or_else(|| PyValueError::new_err(format!("unknown tool parser: {parser_name:?}")))?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Instantiating ParserFactory and creating a parser on every call to parse_tool_call_complete may be inefficient if the detectors involve expensive initialization like regex compilation. Given that this is intended for a high-performance inference engine integration, consider caching the factory or the registry in a static variable (e.g., using OnceLock) to avoid redundant work on every request. If this optimization is applicable to multiple code paths, extract it into a shared helper function to ensure consistency and avoid code duplication.

References
  1. If an optimization is applicable to multiple code paths, extract it into a shared helper function to ensure consistency and avoid code duplication.

Comment on lines +170 to +175
fn serve_oai(
engine: PyObject,
host: &str,
port: u16,
chat_template: Option<&str>,
) -> PyResult<()> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is a mismatch between the function signature and its intended usage described in the docstring. The docstring shows serve_oai being used with await in Python, which implies it should return a Coroutine or Future. To support await in Python, this should be defined as an async fn (supported by PyO3 0.21+). Additionally, when designing APIs that interact with external specifications like OpenAI, prioritize fail-fast behavior for unknown or new types over silent coercion to ensure spec divergences are surfaced as errors at the edge.

References
  1. When designing APIs that interact with external specifications (like OpenAI), prioritize fail-fast behavior for unknown or new types over silent coercion or #[non_exhaustive] with an Unknown variant.

Comment thread bindings/python/src/serving.rs Outdated
Comment on lines +37 to +49
/// HTTP server (``serve_oai``) will spawn its own multi-threaded runtime
/// when implemented.
fn shared_runtime() -> PyResult<&'static Runtime> {
static RT: OnceLock<Runtime> = OnceLock::new();
if let Some(rt) = RT.get() {
return Ok(rt);
}
let rt = Runtime::new()
.map_err(|e| PyRuntimeError::new_err(format!("failed to start tokio runtime: {e}")))?;
RT.set(rt)
.map_err(|_| PyRuntimeError::new_err("tokio runtime initialized twice"))?;
Ok(RT.get().expect("just set"))
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Nit: shared_runtime() has a TOCTOU race on the OnceLock. If two threads both pass the RT.get() check before either calls RT.set(), the loser gets a PyRuntimeError("tokio runtime initialized twice") instead of the perfectly good runtime that the winner already stored.

Today this can't manifest because the #[pyfunction] entries hold the GIL (no py.allow_threads), so calls are serialized. But it becomes a real bug the moment someone wraps the block_on in py.allow_threads for better concurrency, or under free-threaded Python (PEP 703).

Simplest fix — fall back to the already-initialized value on set failure:

Suggested change
/// HTTP server (``serve_oai``) will spawn its own multi-threaded runtime
/// when implemented.
fn shared_runtime() -> PyResult<&'static Runtime> {
static RT: OnceLock<Runtime> = OnceLock::new();
if let Some(rt) = RT.get() {
return Ok(rt);
}
let rt = Runtime::new()
.map_err(|e| PyRuntimeError::new_err(format!("failed to start tokio runtime: {e}")))?;
RT.set(rt)
.map_err(|_| PyRuntimeError::new_err("tokio runtime initialized twice"))?;
Ok(RT.get().expect("just set"))
}
fn shared_runtime() -> PyResult<&'static Runtime> {
static RT: OnceLock<Runtime> = OnceLock::new();
if let Some(rt) = RT.get() {
return Ok(rt);
}
let rt = Runtime::new()
.map_err(|e| PyRuntimeError::new_err(format!("failed to start tokio runtime: {e}")))?;
// Another thread may have won the race — that's fine, drop ours and use theirs.
let _ = RT.set(rt);
Ok(RT.get().expect("initialized by us or another thread"))
}

Copy link
Copy Markdown

@claude claude Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clean skeleton — parser APIs are used correctly, tests cover the key paths. One nit on a latent OnceLock race in shared_runtime().

0 🔴 Important · 1 🟡 Nit · 0 🟣 Pre-existing

yetone added 2 commits April 29, 2026 01:14
Replaces the serve_oai stub with a working in-process OAI HTTP server
that drives a Python AsyncLLM-shaped engine via PyO3 callbacks.

Architecture per the smg-as-tokenspeed-dependency plan agreed with
@syuoni:
* axum HTTP router exposes POST /v1/chat/completions
* the engine (a Py<PyAny> wrapping tokenspeed AsyncLLM) is held in
  axum state; per request we acquire the GIL just long enough to
  build a GenerateReqInput and call .generate_request()
* the resulting Python async generator is bridged into a Rust Future
  via pyo3-async-runtimes' tokio adapter — axum's tokio reactor
  awaits it without holding the GIL
* a tiny inline async-def helper (`_consume_to_last`) drains the
  generator and returns the final yielded dict, since this first cut
  is non-streaming

Cross-language data exchange uses a JSON-string hop in both
directions:
* Rust serializes sampling-params to a JSON string, Python's
  `json.loads` decodes it into a kwarg dict
* the engine's final response dict is dumped via `json.dumps` and
  parsed by serde_json into the axum response body
That avoids fighting pyo3 0.28's IntoPyObject trait surface for
nested types and is portable across pyo3 minor versions.

Runtime ownership: serve_oai builds its own multi-threaded tokio
runtime, registers it with pyo3-async-runtimes (so into_future
schedules onto it), and blocks the calling Python thread on
`axum::serve`. The GIL is released during the listen loop via
py.detach so request handlers can re-acquire it themselves.

First-cut limitations (all tracked as separate follow-ups):
* non-streaming only; streaming SSE wraps __anext__ per chunk
* skips chat-template rendering — passes messages[-1].content
  straight to the engine
* no tool / reasoning post-processing — the parsers are exposed
  separately as parse_tool_call_complete / parse_reasoning_complete
* only /v1/chat/completions wired

New deps in bindings/python/Cargo.toml:
* pyo3-async-runtimes 0.28 (tokio-runtime feature) for the
  Python-coroutine-to-Rust-Future bridge
* axum, serde, serde_json, futures, tracing — workspace deps,
  already pulled in transitively via smg
* uuid (workspace) for request id generation

Signed-off-by: yetone <yetoneful@gmail.com>
Three iterations on n1 ts-claude-test (ts container, real maturin
build) found the right pyo3-async-runtimes 0.28 incantation. Final
shape:

* serve_oai uses pyo3_async_runtimes::tokio::run as the entry point.
  That sets up the asyncio event loop on a supervisor thread, builds
  a tokio runtime, registers both with the bridge's task-locals, and
  runs the supplied Rust future on tokio. axum::serve never returns,
  so run blocks until process exit.

* Inside that future we capture the active TaskLocals via
  pyo3_async_runtimes::tokio::get_current_locals — this is the
  function that reads the tokio task-local set by run, NOT
  TaskLocals::with_running_loop (which calls Python
  asyncio.get_running_loop() and fails because the asyncio loop is
  on a separate thread).

* AppState carries Arc<TaskLocals> alongside the engine. Each axum
  handler clones them.

* drive_generate wraps its inner future in
  pyo3_async_runtimes::tokio::scope(locals, async { ... }) so the
  TaskLocals are visible to into_future() (which reads from a tokio
  task-local that scope populates). Without this wrapper handlers
  hit RuntimeError: no running event loop because they run as fresh
  tokio tasks with no inherited locals.

* GenerateReqInput.rid is a non-init dataclass field; setattr
  post-construction instead of passing as a kwarg.

* PyModule::from_code via the prelude rather than the explicit
  pyo3::types::PyModule path.

Verified on n1 with a FakeAsyncLLM mock: a POST /v1/chat/completions
roundtrips through the wheel into Python, drains the async generator,
and returns a valid OAI ChatCompletion response. All demo assertions
pass.

Signed-off-by: yetone <yetoneful@gmail.com>
@github-actions github-actions Bot added the dependencies Dependency updates label Apr 28, 2026
serve_oai no longer raises immediately — it blocks on axum::serve for
the lifetime of the process. Unit-test path just checks the symbol is
exposed with a sensible docstring; end-to-end behavior is covered by
the FakeAsyncLLM demo run on the dev box.

Signed-off-by: yetone <yetoneful@gmail.com>
@yetone yetone changed the title [skeleton] feat(bindings/python): smg-as-tokenspeed-dependency surface feat(bindings/python): smg-as-tokenspeed-dependency surface (working serve_oai) Apr 28, 2026
Comment on lines +10 to +12
The ``serve_oai`` HTTP entry is still a stub; we just assert it raises a
clear error so callers know the integration hook is wired but the body
isn't implemented yet.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Nit: Module docstring is stale — it still describes serve_oai as a stub that raises an error, but this PR replaces the stub with a real implementation and the test now just checks the symbol is exposed.

Suggested change
The ``serve_oai`` HTTP entry is still a stub; we just assert it raises a
clear error so callers know the integration hook is wired but the body
isn't implemented yet.
The ``serve_oai`` HTTP entry point hosts smg's axum server in-process;
unit tests verify the symbol is exposed (full end-to-end coverage lives
in tools/smg_serve_oai_demo.py).

When the request body has `stream: true`, serve_oai now emits an OAI
`text/event-stream` response instead of the single non-streaming JSON
chunk. Each event is a `chat.completion.chunk` object with a `delta`
field; the final event sets `finish_reason` and a usage block;
`data: [DONE]` terminates per OAI convention.

Mechanics:

* New `stream_response` helper spawns a tokio task scoped under the
  TaskLocals captured at server start. Inside the task it builds a
  `GenerateReqInput(stream=true)`, grabs the resulting Python async
  iterator, and loops calling `__anext__()` -> `into_future` ->
  await -> next chunk. Each chunk is JSON-roundtripped to
  `serde_json::Value`.

* The engine returns `out['text']` cumulatively per the tokenspeed
  contract; we track the previous cumulative text and emit only the
  suffix delta to OAI clients (which expect deltas). On a non-prefix
  refresh we fall back to the full text — the client sees a duplicate
  but no semantic loss.

* `StopAsyncIteration` from `__anext__` ends the loop cleanly via
  `PyErr::is_instance_of::<PyStopAsyncIteration>()`.

* Chunks funnel through a futures `mpsc::unbounded` channel so axum
  can flush them downstream as the client reads. `Sse::keep_alive()`
  keeps idle connections from being culled by intermediate proxies.

* Errors from the engine surface as an SSE `event: error` frame
  rather than tearing down the connection silently.

Verified on n1 (`ts-claude-test` container) with a FakeStreamingAsyncLLM
that yields six cumulative chunks ("The", "The quick", ...,
"The quick brown fox jumps."). Demo asserts:

* received 9 SSE chunks (1 role + 6 content deltas + 1 finish + [DONE])
* reassembled content == "The quick brown fox jumps."
* final chunk carries finish_reason='stop' and the right usage tokens

Non-streaming path is unchanged.

Tool / reasoning streaming detection (sliced into proper tool_calls
deltas) and chat-template render are still follow-ups; the streaming
path emits raw text deltas of `out['text']` for now.

Signed-off-by: yetone <yetoneful@gmail.com>
Comment thread bindings/python/src/serving.rs Outdated
Comment on lines +495 to +500
// Emit a content chunk only if there is actual delta.
// (The engine occasionally yields metadata-only refreshes;
// pushing an empty-content chunk to OAI clients confuses
// the openai-python stream parser.)
if !delta.is_empty() {
let _ = tx_inner.unbounded_send(Ok(Event::default().data(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Important: Every unbounded_send in this task discards its Result with let _ =. When the client disconnects, the rx half is dropped and all sends silently fail — but the spawned task keeps pulling tokens from the engine until the async generator is exhausted.

For an inference server this means a disconnected client still burns GPU compute for the full generation. The fix is to check the send result inside the loop and break on failure:

Suggested change
// Emit a content chunk only if there is actual delta.
// (The engine occasionally yields metadata-only refreshes;
// pushing an empty-content chunk to OAI clients confuses
// the openai-python stream parser.)
if !delta.is_empty() {
let _ = tx_inner.unbounded_send(Ok(Event::default().data(
if !delta.is_empty() {
if tx_inner.unbounded_send(Ok(Event::default().data(

…and then after the send:

                    .is_err() {
                        // Receiver dropped — client disconnected.
                        return Ok(());
                    }

The sends outside the loop (initial role chunk, final chunk, [DONE]) are fine to ignore since there's nothing useful to do if they fail at those points — but the hot-loop send should abort early.

let s: String = json_module
.call_method1("dumps", (chunk_obj.bind(py),))?
.extract()?;
Ok(serde_json::from_str(&s).unwrap_or(Value::Null))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Nit: unwrap_or(Value::Null) silently swallows a serde_json parse failure. If json.dumps returned valid JSON but serde_json::from_str rejects it (e.g. a Python-side NaN/Infinity literal that serde's default config rejects), the chunk becomes Null, cur_text becomes "", delta is empty, and the chunk is silently dropped — no log, no error event.

At minimum, log a warning so this doesn't become a silent data-loss mystery in production:

Suggested change
Ok(serde_json::from_str(&s).unwrap_or(Value::Null))
Ok(serde_json::from_str(&s).unwrap_or_else(|e| {
tracing::warn!(error = %e, "chunk JSON roundtrip failed — dropping chunk");
Value::Null
}))

Comment on lines +360 to +361
/// Chunks are funneled through a bounded ``mpsc::channel`` so axum
/// can flush them downstream as fast as the client reads.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Nit: Docstring says "bounded mpsc::channel" but line 370 creates an unbounded channel. Stale from an earlier draft?

Suggested change
/// Chunks are funneled through a bounded ``mpsc::channel`` so axum
/// can flush them downstream as fast as the client reads.
/// Chunks are funneled through an unbounded ``mpsc::channel`` so axum
/// can flush them downstream as fast as the client reads.

serve_oai now takes an optional `chat_template` arg (the jinja
string from a HuggingFace tokenizer_config.json's chat_template
field). When supplied:

* Compiled into an `llm_tokenizer::chat_template::ChatTemplateProcessor`
  at startup (so a malformed template fails fast, not on first
  request).
* Stored in `AppState` alongside the engine and TaskLocals.
* Each `/v1/chat/completions` request renders the full `messages`
  array through it before sending to the engine — that's the
  OAI-correct path and the one that handles assistant / tool / system
  roles, multi-turn context, etc.

When `chat_template=None` the existing single-turn shortcut still
applies (`messages[-1].content` verbatim) so callers that pre-render
their own template don't break.

Renamed the helper `extract_prompt_from_oai` -> `render_prompt_from_oai`
to reflect the new responsibility.

New workspace dep on `llm-tokenizer` (the crate that already powers
smg's gateway-side chat-template render).

Verified on n1 with a synthetic template:

  CHAT_TEMPLATE = (
    "{%- for m in messages %}< |{{ m.role }}|>{{ m.content }}<|/{{ m.role }}|>{%- endfor %}"
    "{%- if add_generation_prompt %}<|assistant|>{%- endif %}"
  )

  4-turn messages [system, user, assistant, user] ->
  rendered prompt as expected, EchoAsyncLLM round-tripped it back,
  test asserted exact match. ALL ASSERTIONS PASSED.

Signed-off-by: yetone <yetoneful@gmail.com>
Comment on lines +312 to +315
let params = llm_tokenizer::chat_template::ChatTemplateParams {
add_generation_prompt: true,
..Default::default()
};
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Important: special_tokens defaults to None here, which means bos_token, eos_token, etc. all become UNDEFINED in the template context. Many popular HuggingFace chat templates (Llama, Mistral, Gemma, …) reference {{ bos_token }} directly without an {% if bos_token is defined %} guard. With minijinja's default lenient undefined behavior, these render as empty strings silently — the BOS/EOS tokens are dropped from the prompt with no warning or error.

This will produce subtly incorrect prompts for most models, degrading output quality or causing tokenization mismatches.

The serve_oai Python API (line 734) accepts the template string but has no way to pass the special tokens that live alongside chat_template in tokenizer_config.json. Consider either:

  1. Adding a special_tokens: Option<HashMap<String, String>> parameter to serve_oai and threading it through, or
  2. Accepting the full tokenizer_config.json content and extracting both the template and special tokens on the Rust side.

Three additions in one cut, all verified end-to-end on the n1 dev box:

1. Streaming-tool-call SSE
-------------------------
serve_oai now takes `tool_parser="json"` (or any of the 18 detector
names from the workspace ParserFactory). When supplied, each engine
chunk's text delta is fed through the parser's stateful
`parse_incremental(chunk, &tools)`; the resulting StreamingParseResult
carves the delta into:

* `normal_text`    -> OAI `delta.content`
* `calls`          -> OAI `delta.tool_calls` array, with the OAI
                       streaming convention preserved: the first
                       chunk for a tool index carries
                       `{id, type, function.name}`; later chunks
                       only ship `{function.arguments}` deltas.

The `tools` array from the request body is parsed into
`Vec<openai_protocol::common::Tool>` and passed to the parser so
emitted tool names can be validated against the request's declared
tools (the json parser stays silent on unknown names — that's why
the previous demo iteration came up empty until tools were threaded
through).

Demo on n1: FakeMixedAsyncLLM yielding `<think>...</think>{json}` in
three cumulative chunks emits exactly the SSE shape openai-python
expects:

  data: {"delta": {"role": "assistant"}}
  data: {"delta": {"reasoning_content": "let me check the weather"}}
  data: {"delta": {"tool_calls": [{"index": 0, "id": "call_...",
                                    "type": "function",
                                    "function": {"name": "get_weather"}}]}}
  data: {"delta": {"tool_calls": [{"index": 0,
                                    "function": {"arguments": "{\"city\":\"SF\"}"}}]}}
  data: {"delta": {}, "finish_reason": "tool_calls", "usage": {...}}
  data: [DONE]

ALL ASSERTIONS PASSED.

2. Streaming-reasoning SSE
--------------------------
Symmetric to (1): `reasoning_parser="qwen3"` (or any of the 13
reasoning detectors) runs BEFORE the tool parser on each chunk.
Reasoning text is emitted as `delta.reasoning_content` (the
OAI extension popularized by DeepSeek-R1 / Qwen3 and accepted by
most modern chat clients). The visible-text remainder flows through
to the tool parser. This composes correctly with (1) — the demo
above runs both in one stream.

Eager validation at `serve_oai` startup: passing an unknown
`tool_parser` or `reasoning_parser` name fails immediately with the
list of valid choices, instead of failing silently per-request.

3. /v1/completions endpoint
---------------------------
Legacy OAI completions endpoint added alongside /v1/chat/completions.
Takes a raw `prompt` string (or array of strings — first one wins,
batch>1 is a follow-up), bypasses the chat-template path, returns
a single `text_completion`-shape JSON. Streaming is intentionally
not branched here for the first cut; modern clients use chat
completions and adding a second SSE path would just duplicate code.

New deps: `openai-protocol` (workspace) for the `Tool` shape the
tool_parser expects.

Signed-off-by: yetone <yetoneful@gmail.com>
Comment on lines +994 to +1000
/// **Current limitations** (all tracked as separate follow-ups):
///
/// * No tool / reasoning streaming post-processing — the parsers are
/// exposed separately as :py:func:`parse_tool_call_complete` and
/// :py:func:`parse_reasoning_complete` for non-streaming use.
/// * Only ``/v1/chat/completions`` is wired; ``/v1/completions``,
/// ``/v1/responses``, ``/v1/embeddings`` etc. land later.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Nit: Both bullets in this "Current limitations" block are now stale — this push adds tool/reasoning streaming post-processing to stream_response and wires /v1/completions. Suggest updating:

Suggested change
/// **Current limitations** (all tracked as separate follow-ups):
///
/// * No tool / reasoning streaming post-processing — the parsers are
/// exposed separately as :py:func:`parse_tool_call_complete` and
/// :py:func:`parse_reasoning_complete` for non-streaming use.
/// * Only ``/v1/chat/completions`` is wired; ``/v1/completions``,
/// ``/v1/responses``, ``/v1/embeddings`` etc. land later.
/// **Current limitations** (all tracked as separate follow-ups):
///
/// * No streaming for ``/v1/completions`` — only ``/v1/chat/completions``
/// supports ``stream: true``.
/// * ``/v1/responses``, ``/v1/embeddings`` etc. land later.

}
if let Some(name) = reasoning_parser.as_ref() {
let factory = ::reasoning_parser::ParserFactory::new();
if !factory.list_parsers().contains(name) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Nit: The reasoning_parser factory has a has_parser(&self, name: &str) -> bool method (factory.rs:88), same as the tool_parser factory used on line 1046. Using list_parsers().contains() allocates a Vec<String> just to do a membership test.

Suggested change
if !factory.list_parsers().contains(name) {
if !factory.has_parser(name) {

Comment on lines +851 to +854
async fn completions_handler(
State(state): State<AppState>,
Json(body): Json<Value>,
) -> impl IntoResponse {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Nit: When a client sends {"prompt": "...", "stream": true} here, they get a non-streaming JSON response back with no indication that streaming was requested but ignored. The openai-python client will try to parse the response as SSE and throw a confusing error.

The docstring acknowledges this is intentional for the first cut, but consider at least returning a 400 early if stream: true is set:

if body.get("stream").and_then(Value::as_bool).unwrap_or(false) {
    return (
        StatusCode::BAD_REQUEST,
        Json(json!({"error": "streaming not yet supported for /v1/completions"})),
    ).into_response();
}

That way callers get a clear signal instead of a cryptic parse failure on the client side.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

dependencies Dependency updates documentation Improvements or additions to documentation python-bindings Python bindings changes tests Test changes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant