Skip to content
13 changes: 13 additions & 0 deletions bindings/python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,19 @@

This directory contains the Python bindings for SMG (Shepherd Model Gateway), built using [maturin](https://github.com/PyO3/maturin) and [PyO3](https://github.com/PyO3/pyo3).

## Two consumption modes

These bindings serve two distinct use cases:

1. **smg as a Python-launched binary** — the historical use, documented in
[Quick Start](#quick-start) below. `smg serve` boots a router in-process.
2. **smg as a Python library** — used by inference engines that want smg's
protocol layer (tokenization, function calling, reasoning parser, OAI
server, MCP, response API) without owning the routing/gateway. The first
integration target is TokenSpeed; `tokenspeed serve` will `import smg_rs`
and call into the entry points in `src/serving.rs`. See the module
docstring there for the planned shape.

## Quick Start

### Installation
Expand Down
3 changes: 3 additions & 0 deletions bindings/python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use pyo3::prelude::*;
use smg::*;
use smg_auth as auth;

mod serving;

// Define the enums with PyO3 bindings
#[pyclass(eq, from_py_object)]
#[derive(Clone, PartialEq, Debug)]
Expand Down Expand Up @@ -1274,5 +1276,6 @@ fn smg_rs(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_function(wrap_pyfunction!(print_banner, m)?)?;
m.add_function(wrap_pyfunction!(get_available_tool_call_parsers, m)?)?;
m.add_function(wrap_pyfunction!(get_available_reasoning_parsers, m)?)?;
serving::register(m)?;
Ok(())
}
193 changes: 193 additions & 0 deletions bindings/python/src/serving.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
//! ``smg-as-tokenspeed-dependency`` — protocol-layer entry points exposed to
//! Python so an inference engine like TokenSpeed can drop its own
//! tokenization / function-call / reasoning-parser / OAI-server code and
//! call into smg's Rust implementations directly.
//!
//! Direction agreed on 2026-04-27/28 with @syuoni: tokenspeed remains the
//! Python entry (``ts serve``), boots ``AsyncLLM`` as before, and imports
//! this module via PyO3. The Rust side then runs the OAI HTTP layer and
//! drives ``AsyncLLM`` in-process — no gRPC, no IPC, single Python process.
//!
//! This module is the *skeleton*: the parser entry points are real (they
//! re-use the ``tool_parser`` and ``reasoning_parser`` workspace crates as
//! libraries) so callers can verify the integration works end-to-end; the
//! ``serve_oai`` HTTP entry is a stub that raises a clear error message
//! pointing at the follow-up work.
//!
//! See ``crates/protocols``, ``crates/tool_parser``, ``crates/reasoning_parser``,
//! ``crates/tokenizer`` for the full library surface that will land here over
//! the next few iterations.

use std::sync::OnceLock;

use pyo3::exceptions::{PyRuntimeError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::{PyDict, PyList};
use pyo3::{Py, PyAny};
use tokio::runtime::Runtime;

/// Re-export of pyo3's owned-Python-object handle. ``Py<PyAny>`` is what
/// pyo3 0.28 uses where older versions exposed ``PyObject`` from the
/// prelude.
type PyObject = Py<PyAny>;

/// Lazily-initialized tokio runtime shared across blocking PyO3 entries.
///
/// One Runtime per process is enough for the read-only parser entries; the
/// HTTP server (``serve_oai``) will spawn its own multi-threaded runtime
/// when implemented.
fn shared_runtime() -> PyResult<&'static Runtime> {
static RT: OnceLock<Runtime> = OnceLock::new();
if let Some(rt) = RT.get() {
return Ok(rt);
}
let rt = Runtime::new()
.map_err(|e| PyRuntimeError::new_err(format!("failed to start tokio runtime: {e}")))?;
RT.set(rt)
.map_err(|_| PyRuntimeError::new_err("tokio runtime initialized twice"))?;
Ok(RT.get().expect("just set"))
}
Comment on lines +51 to +61
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation of shared_runtime contains a race condition. If multiple threads call this function simultaneously during initialization, both may attempt to create a Runtime, but only one can successfully call RT.set(rt). The other thread will receive a PyRuntimeError ("tokio runtime initialized twice"). Since once_cell is already a dependency in this project (used in lib.rs), it is recommended to use once_cell::sync::OnceCell::get_or_try_init to ensure thread-safe, single-instance initialization while correctly propagating errors from Runtime::new().

fn shared_runtime() -> PyResult<&'static Runtime> {
    static RT: once_cell::sync::OnceCell<Runtime> = once_cell::sync::OnceCell::new();
    RT.get_or_try_init(|| {
        Runtime::new().map_err(|e| PyRuntimeError::new_err(format!("failed to start tokio runtime: {e}")))
    })
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Nit: shared_runtime() has a TOCTOU race on the OnceLock. If two threads both pass the RT.get() check before either calls RT.set(), the loser gets a PyRuntimeError("tokio runtime initialized twice") instead of the perfectly good runtime that the winner already stored.

Today this can't manifest because the #[pyfunction] entries hold the GIL (no py.allow_threads), so calls are serialized. But it becomes a real bug the moment someone wraps the block_on in py.allow_threads for better concurrency, or under free-threaded Python (PEP 703).

Simplest fix — fall back to the already-initialized value on set failure:

Suggested change
/// HTTP server (``serve_oai``) will spawn its own multi-threaded runtime
/// when implemented.
fn shared_runtime() -> PyResult<&'static Runtime> {
static RT: OnceLock<Runtime> = OnceLock::new();
if let Some(rt) = RT.get() {
return Ok(rt);
}
let rt = Runtime::new()
.map_err(|e| PyRuntimeError::new_err(format!("failed to start tokio runtime: {e}")))?;
RT.set(rt)
.map_err(|_| PyRuntimeError::new_err("tokio runtime initialized twice"))?;
Ok(RT.get().expect("just set"))
}
fn shared_runtime() -> PyResult<&'static Runtime> {
static RT: OnceLock<Runtime> = OnceLock::new();
if let Some(rt) = RT.get() {
return Ok(rt);
}
let rt = Runtime::new()
.map_err(|e| PyRuntimeError::new_err(format!("failed to start tokio runtime: {e}")))?;
// Another thread may have won the race — that's fine, drop ours and use theirs.
let _ = RT.set(rt);
Ok(RT.get().expect("initialized by us or another thread"))
}


// =====================================================================
// Tool-call parsing
// =====================================================================

/// Parse a complete (non-streaming) model output for tool calls.
///
/// ``parser_name`` selects which detector to run: one of the values returned
/// by :py:func:`get_available_tool_call_parsers` (e.g. ``"llama"``, ``"qwen"``,
/// ``"kimi_k2"``, ``"deepseek_v3"``, ``"json"``, ...).
///
/// Returns a ``dict`` with two keys:
///
/// * ``"normal_text"``: the prose part of the output, with any tool-call
/// payload stripped.
/// * ``"tool_calls"``: a ``list[dict]`` of ``{"name": str, "arguments": str}``
/// entries — ``arguments`` is the raw JSON string the model emitted (callers
/// usually ``json.loads`` it).
///
/// Raises :class:`ValueError` if the parser name is unknown,
/// :class:`RuntimeError` if the parser fails (malformed payload, partial
/// JSON that the detector chose to reject, etc.).
#[pyfunction]
#[pyo3(signature = (output, parser_name))]
fn parse_tool_call_complete(py: Python<'_>, output: &str, parser_name: &str) -> PyResult<PyObject> {
let factory = tool_parser::ParserFactory::new();
let mut parser = factory
.registry()
.create_parser(parser_name)
.ok_or_else(|| PyValueError::new_err(format!("unknown tool parser: {parser_name:?}")))?;
Comment on lines +87 to +91
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Instantiating ParserFactory and creating a parser on every call to parse_tool_call_complete may be inefficient if the detectors involve expensive initialization like regex compilation. Given that this is intended for a high-performance inference engine integration, consider caching the factory or the registry in a static variable (e.g., using OnceLock) to avoid redundant work on every request. If this optimization is applicable to multiple code paths, extract it into a shared helper function to ensure consistency and avoid code duplication.

References
  1. If an optimization is applicable to multiple code paths, extract it into a shared helper function to ensure consistency and avoid code duplication.


let rt = shared_runtime()?;
let (remaining, calls) = rt
.block_on(async { parser.parse_complete(output).await })
.map_err(|e| PyRuntimeError::new_err(format!("tool parser failed: {e}")))?;

let dict = PyDict::new(py);
dict.set_item("normal_text", remaining)?;

let list = PyList::empty(py);
for call in calls {
let item = PyDict::new(py);
item.set_item("name", call.function.name)?;
item.set_item("arguments", call.function.arguments)?;
list.append(item)?;
}
dict.set_item("tool_calls", list)?;
Ok(dict.into())
}

// =====================================================================
// Reasoning parsing
// =====================================================================

/// Detect and split a model output's reasoning block (e.g. ``<think>...</think>``,
/// Qwen3 thinking tags, DeepSeek-R1 reasoning markers) from the user-visible
/// content.
///
/// ``parser_name`` selects the model-family detector — one of the values
/// returned by :py:func:`get_available_reasoning_parsers` (e.g. ``"qwen3"``,
/// ``"deepseek_r1"``, ``"glm45"``, ...).
///
/// Returns a ``dict`` with:
///
/// * ``"normal_text"``: the prose / answer portion, reasoning stripped.
/// * ``"reasoning_text"``: the raw text inside the reasoning block (or
/// empty string if the model didn't emit one).
#[pyfunction]
#[pyo3(signature = (output, parser_name))]
fn parse_reasoning_complete(py: Python<'_>, output: &str, parser_name: &str) -> PyResult<PyObject> {
let factory = reasoning_parser::ParserFactory::new();
let mut parser = factory
.registry()
.create_parser(parser_name)
.ok_or_else(|| {
PyValueError::new_err(format!("unknown reasoning parser: {parser_name:?}"))
})?;

let result = parser
.detect_and_parse_reasoning(output)
.map_err(|e| PyRuntimeError::new_err(format!("reasoning parser failed: {e}")))?;

let dict = PyDict::new(py);
dict.set_item("normal_text", result.normal_text)?;
dict.set_item("reasoning_text", result.reasoning_text)?;
Ok(dict.into())
}

// =====================================================================
// OAI HTTP server (stub)
// =====================================================================

/// Run smg's OAI-compatible HTTP server in-process, driving the supplied
/// engine via PyO3 callbacks.
///
/// Intended call site (``tokenspeed serve``)::
///
/// async_llm = AsyncLLM(server_args)
/// await smg_rs.serve_oai(
/// engine=async_llm,
/// host=args.host,
/// port=args.port,
/// chat_template=args.chat_template,
/// )
///
/// **Currently a stub.** The full implementation will spin up an axum
/// router that:
///
/// 1. Renders chat templates server-side (``llm-tokenizer`` crate).
/// 2. Tokenizes input via the model's HF tokenizer.
/// 3. Builds ``SamplingParams`` from the request.
/// 4. Awaits ``engine.async_generate(input_ids, sampling_params)`` via
/// ``pyo3-async-runtimes``, pulling tokens out as a Rust ``Stream``.
/// 5. Streams parser-detected tool / reasoning chunks back as SSE.
///
/// Tracking: ``crates/pylib`` direction note in CHANGELOG, and the
/// "smg-as-pylib" thread on Slack with @syuoni.
#[pyfunction]
#[pyo3(signature = (engine, host, port, chat_template = None))]
#[allow(unused_variables)]
fn serve_oai(
engine: PyObject,
host: &str,
port: u16,
chat_template: Option<&str>,
) -> PyResult<()> {
Comment on lines +1010 to +1018
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is a mismatch between the function signature and its intended usage described in the docstring. The docstring shows serve_oai being used with await in Python, which implies it should return a Coroutine or Future. To support await in Python, this should be defined as an async fn (supported by PyO3 0.21+). Additionally, when designing APIs that interact with external specifications like OpenAI, prioritize fail-fast behavior for unknown or new types over silent coercion to ensure spec divergences are surfaced as errors at the edge.

References
  1. When designing APIs that interact with external specifications (like OpenAI), prioritize fail-fast behavior for unknown or new types over silent coercion or #[non_exhaustive] with an Unknown variant.

Err(PyRuntimeError::new_err(
"serve_oai is not implemented yet — landing in a follow-up commit on \
feat/pylib-protocol. Integration shape: smg axum HTTP server in-process, \
driving AsyncLLM via pyo3-async-runtimes. See bindings/python/src/serving.rs \
doc comment for the planned signature and the call sites that will use it.",
))
}

// =====================================================================
// Module wiring
// =====================================================================

pub(crate) fn register(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_function(wrap_pyfunction!(parse_tool_call_complete, m)?)?;
m.add_function(wrap_pyfunction!(parse_reasoning_complete, m)?)?;
m.add_function(wrap_pyfunction!(serve_oai, m)?)?;
Ok(())
}
80 changes: 80 additions & 0 deletions bindings/python/tests/test_serving_skeleton.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""Smoke tests for the smg-as-tokenspeed-dependency skeleton.

These verify that the protocol-layer entry points exposed via
:py:mod:`smg_rs.serving` (registered into the top-level ``smg_rs`` module)
work end-to-end with the existing ``tool_parser`` / ``reasoning_parser``
crates — the same code paths tokenspeed will call once it imports smg as
a dependency for tokenization, function calling, reasoning parsing, and
the OAI server.

The ``serve_oai`` HTTP entry is still a stub; we just assert it raises a
clear error so callers know the integration hook is wired but the body
isn't implemented yet.
Comment on lines +10 to +12
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Nit: Module docstring is stale — it still describes serve_oai as a stub that raises an error, but this PR replaces the stub with a real implementation and the test now just checks the symbol is exposed.

Suggested change
The ``serve_oai`` HTTP entry is still a stub; we just assert it raises a
clear error so callers know the integration hook is wired but the body
isn't implemented yet.
The ``serve_oai`` HTTP entry point hosts smg's axum server in-process;
unit tests verify the symbol is exposed (full end-to-end coverage lives
in tools/smg_serve_oai_demo.py).

"""

from __future__ import annotations

import json

import pytest


smg_rs = pytest.importorskip("smg_rs")


# ---------------------------------------------------------------------------
# Tool-call parsing
# ---------------------------------------------------------------------------


def test_parse_tool_call_complete_json() -> None:
"""The ``json`` parser passes raw JSON tool calls through verbatim."""
payload = '{"name": "get_weather", "arguments": {"city": "SF", "unit": "celsius"}}'
result = smg_rs.parse_tool_call_complete(payload, "json")
assert isinstance(result, dict)
assert "tool_calls" in result
assert "normal_text" in result
assert len(result["tool_calls"]) == 1
call = result["tool_calls"][0]
assert call["name"] == "get_weather"
args = json.loads(call["arguments"])
assert args == {"city": "SF", "unit": "celsius"}


def test_parse_tool_call_complete_unknown_parser_raises() -> None:
with pytest.raises(ValueError, match="unknown tool parser"):
smg_rs.parse_tool_call_complete("anything", "definitely-not-a-real-parser")


# ---------------------------------------------------------------------------
# Reasoning parsing
# ---------------------------------------------------------------------------


def test_parse_reasoning_qwen3_thinking_block() -> None:
"""Qwen3 emits ``<think>...</think>`` around reasoning content."""
text = "<think>let me think step by step</think>The answer is 42."
result = smg_rs.parse_reasoning_complete(text, "qwen3")
assert isinstance(result, dict)
assert result["reasoning_text"].strip() == "let me think step by step"
assert "42" in result["normal_text"]


def test_parse_reasoning_unknown_parser_raises() -> None:
with pytest.raises(ValueError, match="unknown reasoning parser"):
smg_rs.parse_reasoning_complete("anything", "definitely-not-a-real-parser")


# ---------------------------------------------------------------------------
# OAI HTTP server (stub) — just verify the hook is wired.
# ---------------------------------------------------------------------------


def test_serve_oai_stub_raises_with_pointer() -> None:
"""``serve_oai`` will host smg's axum HTTP server in-process, driving
the supplied engine via PyO3 callbacks. Until that lands, the entry
point is a stub that raises a RuntimeError pointing at the follow-up.
"""
sentinel_engine = object()
with pytest.raises(RuntimeError, match="serve_oai is not implemented yet"):
smg_rs.serve_oai(engine=sentinel_engine, host="127.0.0.1", port=8000)
Loading