diff --git a/README.md b/README.md index 61ca506..b13f7d3 100644 --- a/README.md +++ b/README.md @@ -51,3 +51,8 @@ pip install kimi-agent-sdk ``` Python quick start: [guides/python/quickstart.md](./guides/python/quickstart.md) + +## Examples + +- Go: [examples/go/ralph-loop](./examples/go/ralph-loop), [examples/go/rumor-buster](./examples/go/rumor-buster) +- Python: [examples/python/customized-tools](./examples/python/customized-tools), [examples/python/e2b-sandbox](./examples/python/e2b-sandbox) diff --git a/examples/python/e2b-sandbox/README.md b/examples/python/e2b-sandbox/README.md new file mode 100644 index 0000000..b3f098f --- /dev/null +++ b/examples/python/e2b-sandbox/README.md @@ -0,0 +1,73 @@ +# Example: E2B Sandbox + +This example demonstrates how to run Kimi Agent in an [E2B](https://e2b.dev) cloud sandbox using **KAOS** (Kimi Agent Operating System). + +## What is KAOS? + +**KAOS** is a runtime abstraction layer that decouples agent tools from the underlying execution environment. It defines a standard interface for file system operations, process execution, and path manipulation—allowing the same agent code to run locally or in a cloud sandbox. + +```mermaid +flowchart LR + subgraph Server + App["Your App"] --> SDK["Kimi Agent SDK"] --> CLI["Kimi CLI"] + subgraph Tools["Tools"] + ReadFile["ReadFile"] + WriteFile["WriteFile"] + Shell["Shell"] + end + CLI --- Tools + end + + subgraph E2B["E2B Sandbox"] + FS[("Filesystem")] + SH{{"Shell"}} + end + + ReadFile -->|"E2BKaos.readtext()"| FS + WriteFile -->|"E2BKaos.writetext()"| FS + Shell -->|"E2BKaos.exec()"| SH + + style Tools stroke-dasharray: 5 5 +``` + +By default, tools operate on your local filesystem. With `set_current_kaos(E2BKaos(...))`, all tool calls are transparently routed to the E2B sandbox—no agent code changes required. + +## How It Works + +This example vendors an `E2BKaos` implementation in `e2b_kaos.py`, then installs it via `set_current_kaos` so Kimi CLI tools operate on the E2B sandbox environment: + +```python +from e2b import AsyncSandbox +from kaos import set_current_kaos +from kaos.path import KaosPath +from kimi_agent_sdk import prompt + +# Create an E2B sandbox and install as KAOS backend +sandbox = await AsyncSandbox.create() +set_current_kaos(E2BKaos(sandbox, cwd="/home/user")) + +# Agent tools now operate inside the sandbox +async for msg in prompt("Create a Python project", work_dir=KaosPath("/home/user")): + print(msg.extract_text(), end="") +``` + +## Run + +```sh +cd examples/python/e2b-sandbox +uv sync --reinstall + +# Required +export KIMI_API_KEY=your-api-key +export KIMI_BASE_URL=https://api.moonshot.ai/v1 +export KIMI_MODEL_NAME=kimi-k2-thinking-turbo +export E2B_API_KEY=your-e2b-api-key + +# Optional +export E2B_SANDBOX_ID=... +export KIMI_WORK_DIR=/home/user/kimi-workdir + +uv run main.py +``` + +If `E2B_SANDBOX_ID` is not set, the script creates a sandbox and prints the ID. The sandbox lifecycle is managed outside of the SDK. diff --git a/examples/python/e2b-sandbox/agent.yaml b/examples/python/e2b-sandbox/agent.yaml new file mode 100644 index 0000000..d959ee8 --- /dev/null +++ b/examples/python/e2b-sandbox/agent.yaml @@ -0,0 +1,7 @@ +version: 1 +agent: + extend: default + name: "e2b" + # Grep tool only supports local KAOS; disable it for E2B sandboxes. + exclude_tools: + - "kimi_cli.tools.file:Grep" diff --git a/examples/python/e2b-sandbox/e2b_kaos.py b/examples/python/e2b-sandbox/e2b_kaos.py new file mode 100644 index 0000000..91526b1 --- /dev/null +++ b/examples/python/e2b-sandbox/e2b_kaos.py @@ -0,0 +1,423 @@ +from __future__ import annotations + +import asyncio +import posixpath +import shlex +import stat +from collections.abc import AsyncGenerator, Iterable +from datetime import datetime +from pathlib import PurePosixPath +from typing import TYPE_CHECKING, Literal + +# `e2b` does not ship type hints. +from e2b import ( # type: ignore[import-untyped] + AsyncCommandHandle, + AsyncSandbox, + CommandExitException, + CommandResult, + EntryInfo, + FileType, + NotFoundException, +) + +# `e2b` does not ship type hints. +from e2b.sandbox_async.commands.command import Commands # type: ignore[import-untyped] + +from kaos import AsyncReadable, AsyncWritable, Kaos, KaosProcess, StatResult, StrOrKaosPath +from kaos.path import KaosPath + +if TYPE_CHECKING: + + def type_check(e2b_kaos: E2BKaos) -> None: + _: Kaos = e2b_kaos + + +class _E2BStdin: + def __init__(self, commands: Commands, pid: int) -> None: + self._commands = commands + self._pid = pid + self._closed = False + self._eof_sent = False + self._lock = asyncio.Lock() + self._last_task: asyncio.Task[None] | None = None + + def can_write_eof(self) -> bool: + return True + + def close(self) -> None: + self._closed = True + self.write_eof() + + async def drain(self) -> None: + if self._last_task is not None: + await self._last_task + + def is_closing(self) -> bool: + return self._closed + + async def wait_closed(self) -> None: + await self.drain() + + def write(self, data: bytes) -> None: + if self._closed: + return + text = data.decode("utf-8", errors="replace") + self._last_task = asyncio.create_task(self._send(text)) + + def writelines(self, data: Iterable[bytes], /) -> None: + for chunk in data: + self.write(chunk) + + def write_eof(self) -> None: + if self._eof_sent: + return None + self._eof_sent = True + self._last_task = asyncio.create_task(self._send("\x04")) + return None + + async def _send(self, text: str) -> None: + async with self._lock: + await self._commands.send_stdin(self._pid, text) + + +class _E2BProcess: + def __init__(self, handle: AsyncCommandHandle, commands: Commands) -> None: + self._handle = handle + self._stdout = asyncio.StreamReader() + self._stderr = asyncio.StreamReader() + self._stdin = _E2BStdin(commands, handle.pid) + self.stdin: AsyncWritable = self._stdin + self.stdout: AsyncReadable = self._stdout + self.stderr: AsyncReadable = self._stderr + self._returncode: int | None = None + self._exit_future: asyncio.Future[int] = asyncio.get_running_loop().create_future() + self._monitor_task = asyncio.create_task(self._monitor()) + + @property + def pid(self) -> int: + return self._handle.pid + + @property + def returncode(self) -> int | None: + return self._returncode + + async def wait(self) -> int: + return await self._exit_future + + async def kill(self) -> None: + await self._handle.kill() + + def feed_stdout(self, chunk: str) -> None: + if chunk: + self._stdout.feed_data(chunk.encode("utf-8", "replace")) + + def feed_stderr(self, chunk: str) -> None: + if chunk: + self._stderr.feed_data(chunk.encode("utf-8", "replace")) + + async def _monitor(self) -> None: + exit_code = 1 + try: + result: CommandResult = await self._handle.wait() + exit_code = result.exit_code + except CommandExitException as exc: + exit_code = exc.exit_code + except Exception as exc: + self.feed_stderr(f"[e2b command error] {exc}\n") + finally: + self._returncode = exit_code + self._stdout.feed_eof() + self._stderr.feed_eof() + if not self._exit_future.done(): + self._exit_future.set_result(exit_code) + + +class E2BKaos: + """ + KAOS backend for an existing E2B AsyncSandbox. + + Sandbox lifecycle is managed externally; this class never creates or kills sandboxes. + + Docs: + - E2B user/workdir defaults: https://e2b.dev/docs/template/user-and-workdir + - E2B commands/filesystem: https://e2b.dev/docs/commands / https://e2b.dev/docs/filesystem + """ + + name: str = "e2b" + + def __init__( + self, + sandbox: AsyncSandbox, + *, + home_dir: str = "/home/user", + cwd: str | None = None, + user: str | None = None, + request_timeout: float | None = None, + ) -> None: + self._sandbox = sandbox + self._home_dir = posixpath.normpath(home_dir) + self._cwd = posixpath.normpath(cwd) if cwd is not None else self._home_dir + self._user = user + self._request_timeout = request_timeout + + def pathclass(self) -> type[PurePosixPath]: + return PurePosixPath + + def normpath(self, path: StrOrKaosPath) -> KaosPath: + return KaosPath(posixpath.normpath(str(path))) + + def gethome(self) -> KaosPath: + return KaosPath(self._home_dir) + + def getcwd(self) -> KaosPath: + return KaosPath(self._cwd) + + async def chdir(self, path: StrOrKaosPath) -> None: + abs_path = self._abs_path(path) + try: + info: EntryInfo = await self._sandbox.files.get_info( + abs_path, + user=self._user, + request_timeout=self._request_timeout, + ) + except NotFoundException as exc: + raise FileNotFoundError(abs_path) from exc + if info.type != FileType.DIR: + raise NotADirectoryError(f"{abs_path} is not a directory") + self._cwd = abs_path + + async def stat(self, path: StrOrKaosPath, *, follow_symlinks: bool = True) -> StatResult: + if not follow_symlinks: + raise NotImplementedError("E2BKaos.stat does not support follow_symlinks=False") + abs_path = self._abs_path(path) + try: + info: EntryInfo = await self._sandbox.files.get_info( + abs_path, + user=self._user, + request_timeout=self._request_timeout, + ) + except NotFoundException as exc: + raise FileNotFoundError(abs_path) from exc + mode = self._with_type_bits(info) + mtime = self._to_timestamp(info.modified_time) + return StatResult( + st_mode=mode, + st_ino=0, + st_dev=0, + st_nlink=1, + st_uid=0, + st_gid=0, + st_size=info.size, + st_atime=mtime, + st_mtime=mtime, + st_ctime=mtime, + ) + + async def iterdir(self, path: StrOrKaosPath) -> AsyncGenerator[KaosPath]: + abs_path = self._abs_path(path) + entries: list[EntryInfo] = await self._sandbox.files.list( + abs_path, + depth=1, + user=self._user, + request_timeout=self._request_timeout, + ) + for entry in entries: + if entry.path == abs_path: + continue + yield KaosPath(entry.path) + + async def glob( + self, path: StrOrKaosPath, pattern: str, *, case_sensitive: bool = True + ) -> AsyncGenerator[KaosPath]: + if not case_sensitive: + raise ValueError("Case insensitive glob is not supported in current environment") + abs_path = self._abs_path(path) + entries: list[EntryInfo] = await self._sandbox.files.list( + abs_path, + depth=1, + user=self._user, + request_timeout=self._request_timeout, + ) + for entry in entries: + if entry.path == abs_path: + continue + if self._fnmatch(entry.name, pattern): + yield KaosPath(entry.path) + + async def readbytes(self, path: StrOrKaosPath, n: int | None = None) -> bytes: + abs_path = self._abs_path(path) + data: bytes | bytearray = await self._sandbox.files.read( + abs_path, + format="bytes", + user=self._user, + request_timeout=self._request_timeout, + ) + payload = bytes(data) + return payload if n is None else payload[:n] + + async def readtext( + self, + path: StrOrKaosPath, + *, + encoding: str = "utf-8", + errors: Literal["strict", "ignore", "replace"] = "strict", + ) -> str: + payload = await self.readbytes(path) + return payload.decode(encoding, errors=errors) + + async def readlines( + self, + path: StrOrKaosPath, + *, + encoding: str = "utf-8", + errors: Literal["strict", "ignore", "replace"] = "strict", + ) -> AsyncGenerator[str]: + text = await self.readtext(path, encoding=encoding, errors=errors) + for line in text.splitlines(keepends=True): + yield line + + async def writebytes(self, path: StrOrKaosPath, data: bytes) -> int: + abs_path = self._abs_path(path) + await self._sandbox.files.write( # type: ignore[reportUnknownMemberType] + abs_path, + data, + user=self._user, + request_timeout=self._request_timeout, + ) + return len(data) + + async def writetext( + self, + path: StrOrKaosPath, + data: str, + *, + mode: Literal["w", "a"] = "w", + encoding: str = "utf-8", + errors: Literal["strict", "ignore", "replace"] = "strict", + ) -> int: + abs_path = self._abs_path(path) + payload = data.encode(encoding, errors=errors) + if mode == "a": + payload = await self._read_for_append_bytes(abs_path) + payload + await self._sandbox.files.write( # type: ignore[reportUnknownMemberType] + abs_path, + payload, + user=self._user, + request_timeout=self._request_timeout, + ) + return len(data) + + async def mkdir( + self, path: StrOrKaosPath, parents: bool = False, exist_ok: bool = False + ) -> None: + abs_path = self._abs_path(path) + existing = await self._get_info(abs_path) + if existing is not None: + if not exist_ok: + raise FileExistsError(f"{abs_path} already exists") + if existing.type != FileType.DIR: + raise FileExistsError(f"{abs_path} already exists and is not a directory") + return + + if not parents: + parent = posixpath.dirname(abs_path) or "/" + parent_info = await self._get_info(parent) + if parent_info is None: + raise FileNotFoundError(f"Parent directory {parent} does not exist") + if parent_info.type != FileType.DIR: + raise NotADirectoryError(f"Parent path {parent} is not a directory") + + await self._sandbox.files.make_dir( + abs_path, + user=self._user, + request_timeout=self._request_timeout, + ) + + async def exec(self, *args: str) -> KaosProcess: + if not args: + raise ValueError("At least one argument (the program to execute) is required.") + command = " ".join(shlex.quote(arg) for arg in args) + if self._cwd: + command = f"cd {shlex.quote(self._cwd)} && {command}" + process: _E2BProcess | None = None + stdout_buffer: list[str] = [] + stderr_buffer: list[str] = [] + + def on_stdout(chunk: str) -> None: + if process is None: + stdout_buffer.append(chunk) + else: + process.feed_stdout(chunk) + + def on_stderr(chunk: str) -> None: + if process is None: + stderr_buffer.append(chunk) + else: + process.feed_stderr(chunk) + + handle: AsyncCommandHandle = await self._sandbox.commands.run( + command, + background=True, + envs=None, + user=self._user, + cwd=self._cwd, + on_stdout=on_stdout, + on_stderr=on_stderr, + stdin=True, + timeout=None, + request_timeout=self._request_timeout, + ) + process = _E2BProcess(handle, self._sandbox.commands) + for chunk in stdout_buffer: + process.feed_stdout(chunk) + for chunk in stderr_buffer: + process.feed_stderr(chunk) + return process + + async def _read_for_append_bytes(self, path: str) -> bytes: + try: + existing: bytes | bytearray = await self._sandbox.files.read( + path, + format="bytes", + user=self._user, + request_timeout=self._request_timeout, + ) + except NotFoundException: + return b"" + return bytes(existing) + + async def _get_info(self, path: str) -> EntryInfo | None: + try: + info: EntryInfo = await self._sandbox.files.get_info( + path, + user=self._user, + request_timeout=self._request_timeout, + ) + return info + except NotFoundException: + return None + + def _abs_path(self, path: StrOrKaosPath) -> str: + raw = str(path) + if posixpath.isabs(raw): + return posixpath.normpath(raw) + return posixpath.normpath(posixpath.join(self._cwd, raw)) + + @staticmethod + def _fnmatch(name: str, pattern: str) -> bool: + from fnmatch import fnmatchcase + + return fnmatchcase(name, pattern) + + @staticmethod + def _to_timestamp(value: datetime) -> float: + return value.timestamp() + + @staticmethod + def _with_type_bits(info: EntryInfo) -> int: + mode = info.mode + type_mode = stat.S_IFDIR if info.type == FileType.DIR else stat.S_IFREG + if stat.S_IFMT(mode) == 0: + mode |= type_mode + return mode if mode else type_mode + diff --git a/examples/python/e2b-sandbox/main.py b/examples/python/e2b-sandbox/main.py new file mode 100644 index 0000000..d131161 --- /dev/null +++ b/examples/python/e2b-sandbox/main.py @@ -0,0 +1,72 @@ +from __future__ import annotations + +import asyncio +import os +from pathlib import Path + +from e2b import AsyncSandbox +from kaos import reset_current_kaos, set_current_kaos +from kaos.path import KaosPath + +from e2b_kaos import E2BKaos +from kimi_agent_sdk import prompt + + +async def main() -> None: + # Step 1: ensure E2B is configured. + api_key = os.getenv("E2B_API_KEY") + if not api_key: + raise RuntimeError("E2B_API_KEY is required to use E2B sandboxes") + + # Step 2: pick a working directory inside the sandbox. + work_dir_path: str = os.getenv("KIMI_WORK_DIR", DEFAULT_WORK_DIR) + # Step 3: create a sandbox (or connect to one). + sandbox: AsyncSandbox = await _get_sandbox() + print(f"Created sandbox: {sandbox.sandbox_id}") + + # Step 4: install E2B as the KAOS backend for the SDK. + e2b_kaos: E2BKaos = E2BKaos( + sandbox, + cwd=work_dir_path, + ) + token = set_current_kaos(e2b_kaos) + try: + # Step 5: use KaosPath to access the sandbox filesystem. + work_dir: KaosPath = KaosPath(work_dir_path) + await work_dir.mkdir(parents=True, exist_ok=True) + + # Step 6: call the high-level prompt API as usual. + async for msg in prompt( + "You are in a E2B sandbox. Explore the environment. Try all your tools!", + work_dir=work_dir, + agent_file=AGENT_FILE, + yolo=True, + ): + print("─" * 60) + print(msg) + print("─" * 60) + finally: + reset_current_kaos(token) + + +async def _get_sandbox() -> AsyncSandbox: + # Tutorial tip: swap this with a connect flow if you already have a sandbox. + # + # sandbox_id = os.getenv("E2B_SANDBOX_ID") + # if sandbox_id: + # return await AsyncSandbox.connect(sandbox_id) + sandbox: AsyncSandbox = await AsyncSandbox.create( + template=DEFAULT_TEMPLATE, + timeout=DEFAULT_TIMEOUT_SEC, + ) + return sandbox + + +DEFAULT_WORK_DIR = "/home/user/kimi-workdir" +DEFAULT_TEMPLATE = "base" +DEFAULT_TIMEOUT_SEC = 300 +AGENT_FILE = Path(__file__).resolve().with_name("agent.yaml") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/python/e2b-sandbox/pyproject.toml b/examples/python/e2b-sandbox/pyproject.toml new file mode 100644 index 0000000..063f7f0 --- /dev/null +++ b/examples/python/e2b-sandbox/pyproject.toml @@ -0,0 +1,13 @@ +[project] +name = "kimi-agent-sdk-e2b-sandbox" +version = "0.1.0" +description = "Run Kimi Agent SDK with an E2B-backed KAOS" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "kimi-agent-sdk", + "e2b>=2.10.2", +] + +[tool.uv.sources] +kimi-agent-sdk = { path = "../../../python" }