Skip to content

Commit 424dd15

Browse files
committed
Phase 4.2-4.4: require-auth + API key docs, rate limit, access logs, Parquet export
- RUNSTREAM_REQUIRE_AUTH: POST requires Bearer; 503 if key unset when required - .env.example + compose defaults; Docker POST auth documented in README - AccessLogMiddleware + optional RateLimitMiddleware (RUNSTREAM_ENABLE_RATE_LIMIT) - runstream serve configures runstream.access logging - runstream export-parquet + optional [parquet] extra; pyarrow in [dev] for CI - Tests: auth misconfig, rate limit 429, Parquet roundtrip + empty DB Made-with: Cursor
1 parent 8441022 commit 424dd15

File tree

12 files changed

+287
-8
lines changed

12 files changed

+287
-8
lines changed

.env.example

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# Copy to .env and adjust. Docker Compose reads RUNSTREAM_API_KEY from the environment.
2+
3+
# Required when RUNSTREAM_REQUIRE_AUTH=1 (recommended for any exposed deployment)
4+
RUNSTREAM_API_KEY=generate-a-long-random-secret
5+
6+
# Set to 1 in production / Docker to force POST /runs to require Bearer token
7+
RUNSTREAM_REQUIRE_AUTH=1
8+
9+
# Optional: per-IP sliding window (requests per minute, default 120)
10+
# RUNSTREAM_ENABLE_RATE_LIMIT=1
11+
# RUNSTREAM_RATE_LIMIT_RPM=120
12+
13+
# Optional: disable one-line app access logs (default: logs enabled when using runstream serve)
14+
# RUNSTREAM_DISABLE_ACCESS_LOG=1

README.md

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ Same body shape as `meta.json` / `RunRecord`. Idempotent via SHA-256 of canonica
3232
curl -s -X POST http://127.0.0.1:8000/runs -H "Content-Type: application/json" -d @fixtures/example_run/meta.json
3333
```
3434

35-
If `RUNSTREAM_API_KEY` is set, send `Authorization: Bearer <key>`.
35+
- **Dev (default):** if only `RUNSTREAM_API_KEY` is set, send `Authorization: Bearer <key>` on POST.
36+
- **Production (recommended):** set `RUNSTREAM_REQUIRE_AUTH=1` and **`RUNSTREAM_API_KEY`** — POST without a valid Bearer returns 401/503. Docker Compose enables this by default; override the key via env (see `.env.example`).
3637

3738
### LLM + tools (optional)
3839

@@ -51,6 +52,21 @@ runstream watch fixtures/example_run --db runstream.db --debounce 2
5152

5253
On `meta.json` create/modify under the watched tree, ingest re-runs after **debounce** seconds (default 2). Ctrl+C stops.
5354

55+
### Rate limit & access logs (API)
56+
57+
When **`RUNSTREAM_ENABLE_RATE_LIMIT=1`**, each client IP is limited to **`RUNSTREAM_RATE_LIMIT_RPM`** requests per sliding 60s window (default **120**). `/health`, `/docs`, `/openapi.json`, and `/redoc` are exempt.
58+
59+
With **`runstream serve`**, one-line access logs go to stderr under the logger **`runstream.access`**. Set **`RUNSTREAM_DISABLE_ACCESS_LOG=1`** to turn them off.
60+
61+
### Parquet export
62+
63+
```bash
64+
pip install 'runstream[parquet]' # or dev extra already includes pyarrow
65+
runstream export-parquet --db runstream.db --out runs.parquet
66+
```
67+
68+
Writes the SQLite `runs` table as Parquet (JSON fields remain JSON strings).
69+
5470
### Cron (scheduled ingest)
5571

5672
`ingest-once` is idempotent; suitable for cron:
@@ -66,10 +82,13 @@ On `meta.json` create/modify under the watched tree, ingest re-runs after **debo
6682
```bash
6783
mkdir data
6884
runstream ingest-once fixtures/example_run --db data/runstream.db
85+
export RUNSTREAM_API_KEY="$(python -c 'import secrets; print(secrets.token_urlsafe(32))')"
6986
docker compose build
7087
docker compose up
7188
```
7289

90+
Compose sets **`RUNSTREAM_REQUIRE_AUTH=1`** by default; POST `/runs` needs `Authorization: Bearer $RUNSTREAM_API_KEY`. Copy [`.env.example`](.env.example) to tune variables.
91+
7392
API listens on port **8000**; the DB file is `./data/runstream.db` mounted at `/data/runstream.db`.
7493

7594
---
@@ -94,16 +113,16 @@ API listens on port **8000**; the DB file is `./data/runstream.db` mounted at `/
94113
| FastAPI: `GET /health`, `/runs`, `/runs/{id}`, **`POST /runs`** | Done |
95114
| OpenAI tools: `search_runs`, `get_run``tools.py` / `execute_tool` | Done |
96115
| `ask_with_llm` + mock regression test | Done |
97-
| Pytest (18 tests) | Done |
116+
| Pytest (CI) | Done |
98117
| GitHub Actions CI | Done |
99118
| Dockerfile + docker-compose | Done |
100119

101-
### Phase 4 (in progress)
120+
### Phase 4
102121

103122
1. ~~Watch / cron~~**done** (`runstream watch`, cron uses `ingest-once`).
104-
2. **Auth default-on**next: `RUNSTREAM_REQUIRE_AUTH` + compose defaults.
105-
3. **Rate limit + access logs** — then.
106-
4. **Parquet export** — then.
123+
2. ~~Auth default-on~~**done**: `RUNSTREAM_REQUIRE_AUTH` + Docker Compose + `.env.example`.
124+
3. ~~Rate limit + access logs~~**done**: `RUNSTREAM_ENABLE_RATE_LIMIT`, `RUNSTREAM_RATE_LIMIT_RPM` (default 120), `runstream.access` logs (disable with `RUNSTREAM_DISABLE_ACCESS_LOG=1`).
125+
4. ~~Parquet export~~**done**: `runstream export-parquet --db … --out runs.parquet` (`pip install 'runstream[parquet]'`).
107126

108127
---
109128

@@ -117,6 +136,8 @@ src/runstream/
117136
tools.py # OpenAI tool schemas + execute_tool (no shell)
118137
ask.py # optional LLM loop
119138
api.py
139+
http_middleware.py # access logs + optional rate limit
140+
parquet_export.py
120141
cli.py
121142
watch_ingest.py
122143
schemas/

docker-compose.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,9 @@ services:
88
- ./data:/data
99
environment:
1010
RUNSTREAM_DB: /data/runstream.db
11+
# Production-style defaults: override RUNSTREAM_API_KEY in shell or .env
12+
RUNSTREAM_REQUIRE_AUTH: "1"
13+
RUNSTREAM_API_KEY: ${RUNSTREAM_API_KEY:-change-me-before-exposing-this-port}
14+
# Optional hardening (uncomment in .env)
15+
# RUNSTREAM_ENABLE_RATE_LIMIT: "1"
16+
# RUNSTREAM_RATE_LIMIT_RPM: "120"

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ dependencies = [
1818

1919
[project.optional-dependencies]
2020
llm = ["openai>=1.40"]
21-
dev = ["pytest>=8.0", "httpx>=0.27", "openai>=1.40"]
21+
parquet = ["pyarrow>=14.0"]
22+
dev = ["pytest>=8.0", "httpx>=0.27", "openai>=1.40", "pyarrow>=14.0"]
2223

2324
[project.scripts]
2425
runstream = "runstream.cli:main"

src/runstream/api.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,39 @@
66
from fastapi import FastAPI, HTTPException, Header, Query
77
from fastapi.responses import JSONResponse
88

9+
from .http_middleware import AccessLogMiddleware, RateLimitMiddleware
910
from .ingest import ingest_record
1011
from .models import RunRecord
1112
from .store import connect, get_run, list_runs
1213

1314
app = FastAPI(title="Runstream", version="0.2.0")
15+
app.add_middleware(RateLimitMiddleware)
16+
app.add_middleware(AccessLogMiddleware)
1417

1518

1619
def _db_path() -> Path:
1720
return Path(os.environ.get("RUNSTREAM_DB", "runstream.db")).resolve()
1821

1922

23+
def _env_truthy(name: str) -> bool:
24+
return os.getenv(name, "").strip().lower() in ("1", "true", "yes", "on")
25+
26+
2027
def _require_write_auth(authorization: str | None) -> None:
2128
expected = os.environ.get("RUNSTREAM_API_KEY")
29+
require = _env_truthy("RUNSTREAM_REQUIRE_AUTH")
30+
if require:
31+
if not expected:
32+
raise HTTPException(
33+
status_code=503,
34+
detail="RUNSTREAM_API_KEY must be set when RUNSTREAM_REQUIRE_AUTH=1",
35+
)
36+
if not authorization or not authorization.startswith("Bearer "):
37+
raise HTTPException(status_code=401, detail="Authorization Bearer token required")
38+
token = authorization[7:].strip()
39+
if token != expected:
40+
raise HTTPException(status_code=403, detail="invalid token")
41+
return
2242
if not expected:
2343
return
2444
if not authorization or not authorization.startswith("Bearer "):
@@ -72,7 +92,8 @@ def create_run(
7292
) -> dict:
7393
"""
7494
Upsert a run (same validation + idempotency as file ingest).
75-
If env RUNSTREAM_API_KEY is set, require `Authorization: Bearer <key>`.
95+
If `RUNSTREAM_REQUIRE_AUTH=1`, Bearer token is **always** required and `RUNSTREAM_API_KEY` must be set.
96+
Otherwise, if only `RUNSTREAM_API_KEY` is set, Bearer is required for POST.
7697
"""
7798
_require_write_auth(authorization)
7899
action = ingest_record(record, _db_path(), source_path=None)

src/runstream/cli.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from .ask import ask_with_llm
99
from .ingest import ingest_path
10+
from .parquet_export import export_runs_parquet
1011
from .tools import openai_tools_json
1112
from .watch_ingest import watch_and_ingest
1213

@@ -23,6 +24,16 @@ def ingest_once(
2324
typer.echo(f"ingest complete -> {db}: {stats}")
2425

2526

27+
@app.command("export-parquet")
28+
def export_parquet_cmd(
29+
out: Path = typer.Option(..., "--out", help="Output .parquet file path"),
30+
db: Path = typer.Option(Path("runstream.db"), "--db", help="SQLite database path"),
31+
) -> None:
32+
"""Export all rows from the runs table to Parquet (requires pip install 'runstream[parquet]')."""
33+
n = export_runs_parquet(db, out)
34+
typer.echo(f"exported {n} rows -> {out}")
35+
36+
2637
@app.command("watch")
2738
def watch_cmd(
2839
path: Path = typer.Argument(..., exists=True, help="Directory (or file under a directory) to watch"),
@@ -40,9 +51,16 @@ def serve(
4051
port: int = typer.Option(8000, "--port"),
4152
) -> None:
4253
"""Run HTTP API (FastAPI + uvicorn)."""
54+
import logging
4355
import os
4456

4557
os.environ["RUNSTREAM_DB"] = str(db.resolve())
58+
access = logging.getLogger("runstream.access")
59+
if not access.handlers:
60+
_h = logging.StreamHandler()
61+
_h.setFormatter(logging.Formatter("%(levelname)s [access] %(message)s"))
62+
access.addHandler(_h)
63+
access.setLevel(logging.INFO)
4664
uvicorn.run(
4765
"runstream.api:app",
4866
host=host,

src/runstream/http_middleware.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
import os
5+
import time
6+
from collections import defaultdict
7+
8+
from starlette.middleware.base import BaseHTTPMiddleware
9+
from starlette.requests import Request
10+
from starlette.responses import JSONResponse, Response
11+
12+
logger = logging.getLogger("runstream.access")
13+
14+
# Shared bucket for rate limiting (cleared in tests via clear_rate_limit_state_for_tests).
15+
_rate_limit_hits: dict[str, list[float]] = defaultdict(list)
16+
17+
18+
def clear_rate_limit_state_for_tests() -> None:
19+
_rate_limit_hits.clear()
20+
21+
22+
def _env_truthy(name: str) -> bool:
23+
return os.getenv(name, "").strip().lower() in ("1", "true", "yes", "on")
24+
25+
26+
class AccessLogMiddleware(BaseHTTPMiddleware):
27+
"""Structured-ish one-line access log (disable with RUNSTREAM_DISABLE_ACCESS_LOG=1)."""
28+
29+
async def dispatch(self, request: Request, call_next) -> Response: # type: ignore[no-untyped-def]
30+
if _env_truthy("RUNSTREAM_DISABLE_ACCESS_LOG"):
31+
return await call_next(request)
32+
t0 = time.perf_counter()
33+
response = await call_next(request)
34+
ms = (time.perf_counter() - t0) * 1000
35+
client = request.client.host if request.client else "-"
36+
logger.info(
37+
"%s %s %s %d %.2fms",
38+
client,
39+
request.method,
40+
request.url.path,
41+
response.status_code,
42+
ms,
43+
)
44+
return response
45+
46+
47+
class RateLimitMiddleware(BaseHTTPMiddleware):
48+
"""
49+
Sliding window per client IP when RUNSTREAM_ENABLE_RATE_LIMIT=1.
50+
RUNSTREAM_RATE_LIMIT_RPM (default 120). /health is exempt.
51+
"""
52+
53+
def _rpm(self) -> int:
54+
try:
55+
return max(1, int(os.getenv("RUNSTREAM_RATE_LIMIT_RPM", "120")))
56+
except ValueError:
57+
return 120
58+
59+
async def dispatch(self, request: Request, call_next) -> Response: # type: ignore[no-untyped-def]
60+
if not _env_truthy("RUNSTREAM_ENABLE_RATE_LIMIT"):
61+
return await call_next(request)
62+
if request.url.path in ("/health", "/docs", "/openapi.json", "/redoc"):
63+
return await call_next(request)
64+
65+
now = time.monotonic()
66+
window = 60.0
67+
rpm = self._rpm()
68+
ip = request.client.host if request.client else "unknown"
69+
70+
bucket = _rate_limit_hits[ip]
71+
bucket[:] = [t for t in bucket if now - t < window]
72+
if len(bucket) >= rpm:
73+
return JSONResponse(
74+
status_code=429,
75+
content={"detail": "rate limit exceeded", "retry_after_sec": 60},
76+
)
77+
bucket.append(now)
78+
79+
return await call_next(request)

src/runstream/parquet_export.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
from __future__ import annotations
2+
3+
from pathlib import Path
4+
5+
from .store import connect
6+
7+
8+
def export_runs_parquet(db_path: Path, out_path: Path) -> int:
9+
"""
10+
Dump the `runs` table to Parquet (raw SQLite column layout: JSON as strings).
11+
Requires optional dependency: pip install 'runstream[parquet]'.
12+
"""
13+
try:
14+
import pyarrow as pa
15+
import pyarrow.parquet as pq
16+
except ImportError as e: # pragma: no cover - exercised when pyarrow missing
17+
raise SystemExit(
18+
"Parquet export needs pyarrow. Install: pip install 'runstream[parquet]'"
19+
) from e
20+
21+
conn = connect(db_path)
22+
try:
23+
cur = conn.execute("SELECT * FROM runs ORDER BY started_at DESC")
24+
col_names = [c[0] for c in cur.description]
25+
rows = cur.fetchall()
26+
if not rows:
27+
table = pa.table(
28+
{c: pa.array([], type=pa.string()) for c in col_names}
29+
)
30+
else:
31+
data = {col: [r[col] for r in rows] for col in col_names}
32+
table = pa.table(data)
33+
out_path = out_path.resolve()
34+
out_path.parent.mkdir(parents=True, exist_ok=True)
35+
pq.write_table(table, out_path)
36+
return len(rows)
37+
finally:
38+
conn.close()

tests/conftest.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import pytest
2+
3+
from runstream.http_middleware import clear_rate_limit_state_for_tests
4+
5+
6+
@pytest.fixture(autouse=True)
7+
def _clear_rate_limit_between_tests() -> None:
8+
clear_rate_limit_state_for_tests()
9+
yield
10+
clear_rate_limit_state_for_tests()

tests/test_export_parquet.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from pathlib import Path
2+
3+
import pyarrow.parquet as pq
4+
5+
from runstream.ingest import ingest_path
6+
from runstream.parquet_export import export_runs_parquet
7+
8+
9+
def test_export_parquet_roundtrip(tmp_path: Path) -> None:
10+
db = tmp_path / "t.db"
11+
ingest_path(Path("fixtures/example_run"), db)
12+
out = tmp_path / "runs.parquet"
13+
n = export_runs_parquet(db, out)
14+
assert n >= 1
15+
assert out.is_file()
16+
table = pq.read_table(out)
17+
assert "run_id" in table.column_names
18+
assert table.num_rows == n
19+
20+
21+
def test_export_parquet_empty_db(tmp_path: Path) -> None:
22+
db = tmp_path / "empty.db"
23+
db.touch()
24+
from runstream.store import connect
25+
26+
connect(db).close()
27+
out = tmp_path / "empty.parquet"
28+
n = export_runs_parquet(db, out)
29+
assert n == 0
30+
t = pq.read_table(out)
31+
assert t.num_rows == 0

0 commit comments

Comments
 (0)