Python port of @mariozechner/pi-ai.
Use your ChatGPT Plus/Pro subscription to access GPT models from Python — no API key, no per-token billing. Authenticates via OAuth using your existing ChatGPT account and streams completions directly from ChatGPT's backend.
- Python 3.12+
- uv (recommended) or pip
- A ChatGPT Plus or Pro subscription
uv add pi-ai-py
# or
pip install pi-ai-pyFrom source:
git clone https://github.com/Xplo8E/piai
cd piai
uv syncRun once to authenticate. Opens a browser for you to log in with your ChatGPT account.
piai loginCredentials are saved to ~/.piai/auth.json (created automatically). To use a custom path — useful for per-project isolation — set the PIAI_AUTH environment variable to any file path.
piai run "Explain async/await in Python"
piai run "What is 2+2?" --model gpt-5.1
piai run "Summarize this" --system "You are a concise assistant"
piai status # check login state
piai logoutStreams the model response as typed events.
import asyncio
from piai import stream
from piai.types import Context, UserMessage, TextDeltaEvent, DoneEvent
async def main():
ctx = Context(
system_prompt="You are a helpful assistant.",
messages=[UserMessage(content="What is the capital of France?")]
)
async for event in stream("gpt-5.1-codex-mini", ctx):
if isinstance(event, TextDeltaEvent):
print(event.text, end="", flush=True)
elif isinstance(event, DoneEvent):
print()
print(f"Tokens: {event.message.usage['input']} in, {event.message.usage['output']} out")
asyncio.run(main())Returns the full AssistantMessage after the response finishes.
from piai import complete
from piai.types import Context, UserMessage
msg = await complete("gpt-5.1-codex-mini", ctx)
print(msg.text) # convenience property — no need to iterate content blocks
print(msg.stop_reason)
print(msg.usage)Simplest interface — returns the response as a plain string.
from piai import complete_text
text = await complete_text("gpt-5.1-codex-mini", ctx)
print(text)Append messages to context.messages to keep conversation history:
from piai import complete
from piai.types import Context, UserMessage
ctx = Context(system_prompt="You are a helpful assistant.")
ctx.messages.append(UserMessage(content="My name is Vinay."))
response = await complete("gpt-5.1-codex-mini", ctx)
ctx.messages.append(response)
ctx.messages.append(UserMessage(content="What's my name?"))
response = await complete("gpt-5.1-codex-mini", ctx)
print(response.text)Define tools with a JSON Schema parameters dict:
import asyncio
from piai import stream
from piai.types import (
Context, UserMessage, ToolResultMessage, Tool,
ToolCallEndEvent, TextDeltaEvent, DoneEvent,
)
def get_weather(city: str) -> str:
return f"The weather in {city} is sunny, 22°C."
async def main():
ctx = Context(
system_prompt="You are a helpful assistant with access to weather data.",
messages=[UserMessage(content="What's the weather in London?")],
tools=[
Tool(
name="get_weather",
description="Get current weather for a city.",
parameters={
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"],
},
)
],
)
# First turn — model calls the tool
tool_calls = []
async for event in stream("gpt-5.1-codex-mini", ctx):
if isinstance(event, ToolCallEndEvent):
tool_calls.append(event.tool_call)
elif isinstance(event, DoneEvent):
ctx.messages.append(event.message)
# Execute tools and feed results back
for tc in tool_calls:
result = get_weather(**tc.input)
ctx.messages.append(ToolResultMessage(tool_call_id=tc.id, content=result))
# Second turn — final answer
async for event in stream("gpt-5.1-codex-mini", ctx):
if isinstance(event, TextDeltaEvent):
print(event.text, end="", flush=True)
print()
asyncio.run(main())Reasoning models (gpt-5.x) think through problems before responding. piai surfaces this reasoning in real time via events, and accumulates the full text on the final message.
For each reasoning block the model emits:
ThinkingStartEvent— reasoning block opensThinkingDeltaEvent(thinking)— incremental chunks (may be sparse — backend controls granularity)ThinkingEndEvent(thinking)— block closes;thinkingfield always has the full accumulated text
After the run, result.thinking returns the full reasoning across all turns (None if the model didn't reason).
The backend sometimes skips streaming deltas for short reasoning —
ThinkingEndEvent.thinkingis always authoritative.
options = {
"reasoning_effort": "medium", # low / medium / high
"reasoning_summary": "auto", # auto / concise / detailed / off
}result = await agent(...)
print(result.text) # full response text
if result.thinking:
print(f"Model reasoned {len(result.thinking)} chars")
print(result.thinking[:500])piai has a native MCP client. Pass any MCP server — radare2, IDA Pro, filesystem, web search — and agent() auto-discovers tools, runs the model in a loop, executes tool calls, and returns when the model stops.
import asyncio
from piai import agent
from piai.mcp import MCPServer
from piai.types import Context, UserMessage
ctx = Context(
system_prompt="You are an expert ARM64 reverse engineer.",
messages=[UserMessage(content="Analyze /lib/target.so and report all JNI functions.")],
)
result = await agent(
model_id="gpt-5.1-codex-mini",
context=ctx,
mcp_servers=[
MCPServer.stdio("r2pm -r r2mcp"), # radare2
MCPServer.stdio("ida-mcp", name="ida"), # IDA Pro headless
MCPServer.http("http://127.0.0.1:13337/mcp"), # IDA Pro HTTP server
],
options={"reasoning_effort": "medium"},
max_turns=30,
)
print(result.text)Transport types:
| Transport | Usage |
|---|---|
MCPServer.stdio("cmd --args") |
Spawns a local subprocess |
MCPServer.http("http://host/mcp") |
Streamable HTTP (modern) |
MCPServer.sse("http://host/sse") |
Legacy SSE transport |
With auth:
MCPServer.http("https://api.example.com/mcp", bearer_token="my-token")
MCPServer.stdio("my-server", env_extra={"API_KEY": "secret"})Load from TOML config:
# ~/.piai/config.toml
[mcp_servers.r2]
command = "r2pm"
args = ["-r", "r2mcp"]
[mcp_servers.ida]
command = "ida-mcp"
[mcp_servers.ida-http]
url = "http://127.0.0.1:13337/mcp"from piai.mcp import MCPServer
servers = MCPServer.from_toml("~/.piai/config.toml")
result = await agent(model_id="gpt-5.1-codex-mini", context=ctx, mcp_servers=servers)All agent() options:
result = await agent(
model_id="gpt-5.1-codex-mini",
context=ctx,
mcp_servers=[...],
options={"reasoning_effort": "medium"},
max_turns=20, # safety limit on loop iterations
on_event=my_callback, # sync or async — called for every StreamEvent
require_all_servers=False, # True = raise if any server fails to connect
connect_timeout=60.0, # per-server connection timeout in seconds
tool_result_max_chars=32_000, # max chars per tool result (prevents context explosion)
)If you pass both context.tools and mcp_servers, they are merged. MCP tools take priority on name conflicts.
Use local_handlers to handle tool calls with plain Python functions — no MCP server required:
from piai import agent
from piai.types import Context, Tool, UserMessage
ctx = Context(
tools=[Tool(name="add", description="Add two numbers", parameters={
"type": "object",
"properties": {
"a": {"type": "number"},
"b": {"type": "number"},
},
"required": ["a", "b"],
})],
messages=[UserMessage(content="What is 41 + 1?")],
)
result = await agent(
model_id="gpt-5.1-codex-mini",
context=ctx,
local_handlers={
"add": lambda a, b: a + b,
},
)- Handlers can be sync or async
local_handlerstake priority over MCP when names conflict- Mix with
mcp_servers— some tools handled locally, others via MCP
See docs/mcp.md for the full MCP reference.
on_event fires for every event in the stream — text, reasoning, tool calls, and results. Use it to see exactly what the model is thinking and doing in real time.
Stream events (from stream() and agent()):
| Event | Fields | Description |
|---|---|---|
TextStartEvent |
— | Model started producing text |
TextDeltaEvent |
text: str |
Incremental text chunk |
TextEndEvent |
text: str |
Full accumulated text for this block |
ThinkingStartEvent |
— | Model started a reasoning block |
ThinkingDeltaEvent |
thinking: str |
Incremental reasoning chunk |
ThinkingEndEvent |
thinking: str |
Full reasoning text for this block |
ToolCallStartEvent |
tool_call: ToolCall |
Model started a tool call |
ToolCallDeltaEvent |
id: str, json_delta: str |
Partial tool call arguments |
ToolCallEndEvent |
tool_call: ToolCall |
Complete tool call with parsed input |
DoneEvent |
reason: str, message: AssistantMessage |
Stream complete |
ErrorEvent |
reason: str, error: AssistantMessage |
Stream failed |
Agent loop events (from agent() only):
| Event | Fields | Description |
|---|---|---|
AgentToolCallEvent |
turn: int, tool_name: str, tool_input: dict |
Just before a tool is executed |
AgentToolResultEvent |
turn: int, tool_name: str, tool_input: dict, result: str, error: bool |
After tool execution completes |
AgentTurnEndEvent |
turn: int, thinking: str | None, tool_calls: list[ToolCall] |
Summary at end of each loop turn |
DoneEvent.reason values: "stop", "length", "tool_use", "error"
import asyncio
from piai import agent
from piai.mcp import MCPServer
from piai.types import (
Context, UserMessage,
ThinkingStartEvent, ThinkingDeltaEvent, ThinkingEndEvent,
AgentToolCallEvent, AgentToolResultEvent, AgentTurnEndEvent,
TextDeltaEvent,
)
DIM, CYAN, GREEN, YELLOW, RESET = "\033[2m", "\033[36m", "\033[32m", "\033[33m", "\033[0m"
def on_event(event):
if isinstance(event, ThinkingStartEvent):
print(f"\n{DIM}💭 Thinking...{RESET}", flush=True)
elif isinstance(event, ThinkingDeltaEvent):
print(f"{DIM}{event.thinking}{RESET}", end="", flush=True)
elif isinstance(event, ThinkingEndEvent):
# .thinking always has the full text even if deltas were sparse
print(f"\n{DIM}[thinking done]{RESET}\n", flush=True)
elif isinstance(event, AgentToolCallEvent):
args = ", ".join(
f"{k}={v[:60]!r}..." if isinstance(v, str) and len(v) > 60 else f"{k}={v!r}"
for k, v in event.tool_input.items()
)
print(f"\n{CYAN}🔧 Turn {event.turn} → {event.tool_name}({args}){RESET}", flush=True)
elif isinstance(event, AgentToolResultEvent):
status = "❌" if event.error else "✅"
preview = event.result[:200].replace("\n", " ")
print(f"{GREEN}{status} {preview}{'...' if len(event.result) > 200 else ''}{RESET}", flush=True)
elif isinstance(event, AgentTurnEndEvent):
note = f", {len(event.thinking)} chars reasoning" if event.thinking else ""
print(f"\n{YELLOW}── Turn {event.turn}: {len(event.tool_calls)} call(s){note} ──{RESET}\n", flush=True)
elif isinstance(event, TextDeltaEvent):
print(event.text, end="", flush=True)
async def main():
ctx = Context(
system_prompt="You are an expert reverse engineer.",
messages=[UserMessage(content="Analyze /lib/target.so")],
)
result = await agent(
model_id="gpt-5.1-codex-mini",
context=ctx,
mcp_servers=[MCPServer.stdio("r2pm -r r2mcp")],
options={"reasoning_effort": "medium"},
max_turns=30,
on_event=on_event,
)
print(f"\nDone. {len(result.text)} chars.")
if result.thinking:
print(f"Total reasoning: {len(result.thinking)} chars")
asyncio.run(main())The
on_eventcallback can be async — piai awaits it automatically. You can write to websockets, queues, or any async sink directly.
See the examples/ directory for complete runnable examples including radare2, IDA Pro, LangChain, and LangGraph supervisor.
options = {
"session_id": "my-session", # enables prompt caching across calls
"reasoning_effort": "high", # low / medium / high — how hard the model thinks
"reasoning_summary": "auto", # auto / concise / detailed / off — thinking verbosity
"text_verbosity": "medium", # low / medium / high
}
temperatureis not supported by the ChatGPT backend — the API returns an error if you pass it.
Any model your ChatGPT Plus/Pro subscription can access. Common ones:
gpt-5.1-codex-mini— fast, defaultgpt-5.1— more capablegpt-5.1-codex-maxgpt-5.2,gpt-5.2-codexgpt-5.3-codex,gpt-5.3-codex-sparkgpt-5.4
The model ID is passed directly to the backend — use whatever appears in ChatGPT's model picker.
PiAIChatModel is a drop-in LangChain BaseChatModel backed by piai. Works anywhere LangChain accepts a chat model.
pip install "pi-ai-py[langgraph]"from piai.langchain import PiAIChatModel
from langchain_core.messages import HumanMessage
llm = PiAIChatModel(
model_name="gpt-5.1-codex-mini",
options={"reasoning_effort": "medium"},
)
# Sync invoke
result = llm.invoke([HumanMessage(content="What is 2+2?")])
print(result.content)
# Reasoning is surfaced via additional_kwargs
if result.additional_kwargs.get("thinking"):
print(f"Model thought: {result.additional_kwargs['thinking'][:200]}...")
# Async stream — get thinking deltas live
async for chunk in llm.astream([HumanMessage(content="Solve step by step: 23 * 47")]):
if chunk.additional_kwargs.get("thinking_delta"):
print(chunk.additional_kwargs["thinking_delta"], end="", flush=True)
elif chunk.content:
print(chunk.content, end="", flush=True)
# Tool binding
llm_with_tools = llm.bind_tools([my_tool])
result = llm_with_tools.invoke([HumanMessage(content="Use the tool")])piai integrates with LangGraph for multi-agent workflows.
Convert any MCP server into LangChain BaseTool instances for use in LangGraph agents:
from piai.mcp import MCPServer
from piai.mcp.langchain_tools import MCPHubToolset
from piai.langchain import PiAIChatModel
from langchain.agents import create_agent
from langchain_core.messages import HumanMessage
llm = PiAIChatModel(model_name="gpt-5.1-codex-mini")
servers = [MCPServer.stdio("npx -y @modelcontextprotocol/server-filesystem /tmp")]
async with MCPHubToolset(servers) as tools:
agent = create_agent(model=llm, tools=tools, system_prompt="You are a file explorer.")
result = await agent.ainvoke({"messages": [HumanMessage(content="List files in /tmp")]})
print(result["messages"][-1].content)Wrap a full piai agent() (with its own model, MCP servers, and observability) as a single callable tool for a supervisor:
from piai.langchain import SubAgentTool, PiAIChatModel
from piai.mcp import MCPServer
from langgraph_supervisor import create_supervisor
from langchain_core.messages import HumanMessage
file_agent = SubAgentTool(
name="file_agent",
description="Reads, writes, and analyses files.",
model_id="gpt-5.1-codex-mini",
system_prompt="You are a file management specialist.",
mcp_servers=[MCPServer.stdio("npx -y @modelcontextprotocol/server-filesystem /tmp")],
max_turns=8,
on_event=on_event, # full inner-loop observability on the sub-agent
)
workflow = create_supervisor(
tools=[file_agent],
model=PiAIChatModel(model_name="gpt-5.1-codex-mini"),
prompt="You are a supervisor. Delegate tasks to specialists.",
).compile()
result = await workflow.ainvoke({"messages": [HumanMessage(content="Summarize /tmp/app.py")]})
print(result["messages"][-1].content)See examples/langgraph_supervisor_agent.py for the full runnable example.
Credentials are stored at ~/.piai/auth.json (created automatically on first login).
To use a different path — for per-project isolation or CI environments — set PIAI_AUTH:
PIAI_AUTH=/path/to/project/auth.json piai login
PIAI_AUTH=/path/to/project/auth.json python my_script.py{
"openai-codex": {
"refresh": "<refresh_token>",
"access": "<access_token>",
"expires": 1234567890000,
"accountId": "<account_id>"
}
}Never commit auth.json to version control.
mcp_server.py in the project root exposes piai's complete_text() as an MCP tool — so any MCP-compatible client (Claude Code, Cursor, Windsurf) can query ChatGPT Plus through your subscription.
Register in Claude Code (~/.claude/claude_desktop_config.json):
{
"mcpServers": {
"piai": {
"command": "/path/to/.venv/bin/python",
"args": ["/path/to/piai/mcp_server.py"]
}
}
}Register in Cursor / Windsurf (MCP settings, same format).
Run standalone:
python mcp_server.pyOnce registered, the complete tool is available in your editor — pass any query and optionally a model ID:
complete(query="Explain ROP chains", model="gpt-5.1")
Available models: gpt-5.1-codex-mini (default), gpt-5.1, gpt-5.1-codex-max, gpt-5.2, gpt-5.2-codex, gpt-5.3-codex, gpt-5.3-codex-spark, gpt-5.4
src/piai/
├── __init__.py # Public API: stream, complete, complete_text, agent, MCPServer
├── types.py # Context, messages, stream events
├── stream.py # Entry points with auth handling
├── agent.py # Autonomous agentic loop with MCP support
├── cli.py # CLI commands
├── mcp/
│ ├── server.py # MCPServer config (stdio/http/sse + from_toml)
│ ├── client.py # MCPClient — persistent session per server
│ ├── hub.py # MCPHub — multi-server manager
│ └── langchain_tools.py # MCP → LangChain tool bridge (MCPHubToolset)
├── langchain/
│ ├── chat_model.py # PiAIChatModel — LangChain BaseChatModel adapter
│ └── sub_agent_tool.py # SubAgentTool — piai agent as LangChain BaseTool
├── oauth/
│ ├── pkce.py # PKCE verifier/challenge (RFC 7636)
│ ├── types.py # OAuthCredentials, OAuthProviderInterface
│ ├── storage.py # auth.json read/write
│ ├── openai_codex.py # ChatGPT Plus OAuth login + refresh
│ └── __init__.py # Provider registry + get_oauth_api_key()
└── providers/
├── message_transform.py # Context → OpenAI Responses API format
└── openai_codex.py # SSE streaming to chatgpt.com/backend-api
uv run pytest tests/ -vBuilt on top of @mariozechner/pi-mono — the original JS SDK that reverse-engineered the ChatGPT OAuth flow and backend API. The Python port follows the same provider architecture and auth design.
LangChain and LangGraph integrations use LangChain and LangGraph by LangChain AI.