Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions docs/operations/audit.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,48 @@ JSON string, `?param.code=200` matches the number.
Allowed `has=` columns: `request_params`, `request_headers`,
`response_result`, `response_error`, `notifications`, `replayed_from`.

### Replay a captured call

`POST /api/v1/portal/audit/events/{id}/replay` re-invokes the tool with the same arguments captured on the original event, through an in-process MCP client. The replay produces a new audit row tagged `source=portal-replay` with `replayed_from = {id}`; that row is fired with the portal-authenticated identity, not the original caller's, so an operator can see who triggered the replay.

```bash
# Find a tool error from the last hour that you want to reproduce.
curl -H "X-API-Key: $KEY" \
"$BASE/api/v1/portal/audit/events?response.isError=true&from=$(date -u -v-1H +%FT%TZ)&limit=5" \
| jq -r '.events[].id'

# Replay one. The response includes the new event's id so you can
# follow up with /events/{id}.
curl -X POST -H "X-API-Key: $KEY" -H "X-Requested-With: x" \
"$BASE/api/v1/portal/audit/events/<id>/replay" | jq
```

The replay refuses (`400`) when:

- the original event has no captured payload (capture was disabled when it was written),
- any captured parameter value is the literal `[redacted]` (replaying with a placeholder would mislead about what the call did; re-stage manually via Try-It with the real value),
- the named tool is no longer registered.

A per-identity token bucket (5 burst, ~5/min sustained) protects against runaway replay loops; exhausted callers get `429 Too Many Requests` with a `Retry-After` header.

Replay re-runs the tool's side effects. If the original call wrote to a database, sent a notification, or charged a card, the replay does it again. There is no dry-run mode and no per-tool allow list; if the operator can hit `/replay`, every registered tool is replayable. Treat this like Try-It: a developer affordance for debugging, not a production self-service.

### Live tail

`GET /api/v1/portal/audit/stream` is an SSE endpoint that emits one `event: audit\ndata: <event JSON>` per newly-written audit event. Open the connection, fire calls, watch them flow:

```bash
# In one terminal:
curl -N -H "X-API-Key: $KEY" "$BASE/api/v1/portal/audit/stream"

# In another, fire some tool calls; the first terminal sees them
# arrive within ~200ms of each write.
```

The endpoint emits an opening `: connected` comment so the consumer can detect the connection is live before the first audit row arrives, and a `: keepalive` comment every 30 seconds to keep idle proxies from killing the connection. Subscribers see only events written AFTER they subscribe; for history use `/events` or `/export`.

Slow consumers drop events silently per-subscriber (the producer never blocks). The buffered channel default is 64 events; SSE clients should drain promptly to avoid drops during bursts.

### NDJSON export

`/api/v1/portal/audit/export?format=jsonl` streams summary rows as
Expand Down
2 changes: 2 additions & 0 deletions docs/reference/http-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ Behind the cookie or `X-API-Key` / `Authorization: Bearer`.
| `GET` | `/api/v1/portal/tools/{name}` | Same shape, single tool. |
| `GET` | `/api/v1/portal/audit/events` | Paginated audit events. Query: `from`, `to` (RFC 3339), `tool`, `user`, `session`, `success`, `q`, `limit`, `offset`, plus the JSONB filters described below. |
| `GET` | `/api/v1/portal/audit/events/{id}` | Single event by id (UUID); includes the captured payload row when present. 400 on a non-UUID id, 404 when the event isn't recorded. |
| `POST` | `/api/v1/portal/audit/events/{id}/replay` | Re-invokes the captured tool call through an in-process MCP client. Writes a new audit event tagged `source=portal-replay` with `replayed_from` pointing at `{id}`. Per-identity rate limited (5 burst, 1 token / 12s); returns `429 Too Many Requests` with `Retry-After` when exhausted. Refuses (`400`) if the original event has no captured payload, has redacted parameter values, or names a tool no longer registered. CSRF-gated via `X-Requested-With`. |
| `GET` | `/api/v1/portal/audit/export` | NDJSON stream of summary rows for a filter. `format=jsonl` (default) is the only supported format. Same filter surface as `/events`. Capped at 100,000 rows per request. |
| `GET` | `/api/v1/portal/audit/stream` | SSE live tail of new audit events. One `event: audit\ndata: <event JSON>` per write; opening comment `: connected` confirms the connection; `: keepalive` every 30 seconds. Sets `X-Accel-Buffering: no` for nginx-fronted deployments. |
| `GET` | `/api/v1/portal/audit/timeseries` | Bucketed counts. Query: `from`, `to`, `bucket` (Go duration). |
| `GET` | `/api/v1/portal/audit/breakdown` | Group-by aggregations. Query: `by` (`tool`/`user`/`success`/`auth_type`). |
| `GET` | `/api/v1/portal/dashboard` | 1-hour stats + recent activity. |
Expand Down
4 changes: 2 additions & 2 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func Build(ctx context.Context, cfg *config.Config, logger *slog.Logger) (*Appli
app.browser = ba
}
// Rebuild the mux with portal handlers attached.
portalAPI := httpsrv.NewPortalAPI(cfg, app.registry, auditLog)
portalAPI := httpsrv.NewPortalAPI(cfg, app.registry, auditLog, app.mcpServer, cfg.Audit.RedactKeys)
adminAPI := httpsrv.NewAdminAPI(dbStore, app.mcpServer, auditLog, app.registry, cfg.Audit.RedactKeys)
portalAuth := httpsrv.NewPortalAuth(sessions, chain)
app.mux = buildMuxWithPortal(cfg, app.mcpServer, app.readiness, app.browser, portalAPI, adminAPI, portalAuth)
Expand All @@ -143,7 +143,7 @@ func BuildWithDeps(cfg *config.Config, logger *slog.Logger, chain *auth.Chain, a
if cfg.Portal.CookieSecret != "" {
sessions, _ = httpsrv.NewSessionStore(cfg.Portal.CookieName, cfg.Portal.CookieSecret, false, time.Hour)
}
portalAPI := httpsrv.NewPortalAPI(cfg, app.registry, auditLog)
portalAPI := httpsrv.NewPortalAPI(cfg, app.registry, auditLog, app.mcpServer, cfg.Audit.RedactKeys)
adminAPI := httpsrv.NewAdminAPI(nil, app.mcpServer, auditLog, app.registry, cfg.Audit.RedactKeys)
portalAuth := httpsrv.NewPortalAuth(sessions, chain)
app.sessions = sessions
Expand Down
90 changes: 90 additions & 0 deletions pkg/audit/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,50 @@ type AsyncLogger struct {

mu sync.Mutex
dropped uint64

// Live-tail subscribers. Mutex-protected for the registry
// itself; sends to the channels are non-blocking so a slow
// consumer can't stall the drain goroutine. Drop counts per
// subscriber are intentionally NOT tracked individually; the
// global Dropped() count covers the buffered-channel-input
// drop, and sse-tail consumers are expected to handle gaps.
subsMu sync.Mutex
subs []*subscriber
}

// subscriber holds a per-consumer channel + a closed flag, both
// protected by mu so a concurrent broadcast and cancel cannot race
// on s.ch (send on closed channel panic / data race detector).
type subscriber struct {
mu sync.Mutex
ch chan Event
closed bool
}

// send attempts a non-blocking send. Caller must NOT hold s.mu.
// Returns silently when the buffer is full (drop) or the subscriber
// has been cancelled (drop).
func (s *subscriber) send(ev Event) {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return
}
select {
case s.ch <- ev:
default:
}
}

// closeOnce closes the channel exactly once. Idempotent.
func (s *subscriber) closeOnce() {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return
}
s.closed = true
close(s.ch)
}

// NewAsyncLogger returns a buffered async wrapper around inner. bufferSize
Expand Down Expand Up @@ -181,6 +225,52 @@ func (a *AsyncLogger) write(ev Event) {
defer cancel()
if err := a.inner.Log(ctx, ev); err != nil {
a.logger.Warn("audit write failed", "tool", ev.ToolName, "err", err)
return
}
// Broadcast successful writes to live-tail subscribers. Done
// after inner.Log() so subscribers only see persisted events;
// a write that errored out doesn't surface to the live tail.
a.broadcast(ev)
}

// Subscribe registers a live-tail consumer and returns the channel
// plus a cancel func. See SubscribingLogger doc for semantics.
//
// buf <= 0 falls back to a sane default (64). Slow consumers cause
// per-subscriber event drops, not producer-side blocking.
func (a *AsyncLogger) Subscribe(buf int) (<-chan Event, func()) {
if buf <= 0 {
buf = 64
}
s := &subscriber{ch: make(chan Event, buf)}
a.subsMu.Lock()
a.subs = append(a.subs, s)
a.subsMu.Unlock()

cancel := func() {
a.subsMu.Lock()
for i, x := range a.subs {
if x == s {
a.subs = append(a.subs[:i], a.subs[i+1:]...)
break
}
}
a.subsMu.Unlock()
s.closeOnce()
}
return s.ch, cancel
}

// broadcast sends ev to every active subscriber, non-blocking. A
// subscriber whose buffer is full silently drops this event. Each
// subscriber's send is gated by its own mutex so a concurrent cancel
// can't close the channel mid-send.
func (a *AsyncLogger) broadcast(ev Event) {
a.subsMu.Lock()
subs := append([]*subscriber{}, a.subs...)
a.subsMu.Unlock()
for _, s := range subs {
s.send(ev)
}
}

Expand Down
21 changes: 21 additions & 0 deletions pkg/audit/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,27 @@ type StreamingLogger interface {
// and the underlying backend delivers fewer.
const MaxQueryLimit = 1000

// SubscribingLogger is the optional capability for fan-out of newly
// written audit events to live consumers (the SSE live-tail endpoint
// is the primary use). Stores or wrappers that broadcast events on
// Log() implement it; the consumer type-asserts before subscribing.
//
// Semantics:
// - Subscribe returns a receive-only channel of events plus a
// cancel func. The caller MUST call cancel() on disconnect to
// release the slot; otherwise the registry leaks.
// - The channel is buffered with `buf` slots. When a producer
// writes faster than the consumer drains, events are dropped
// for that subscriber (the producer never blocks on a slow
// consumer). Picking buf is a tradeoff between memory and the
// drop rate; 64 is a reasonable starting point for SSE.
// - Subscribers see events that succeeded at the underlying
// backend (in AsyncLogger, the broadcast happens after
// inner.Log() returns nil). Failed writes are not surfaced.
type SubscribingLogger interface {
Subscribe(buf int) (<-chan Event, func())
}

// TimePoint is one bucket of an audit time series.
type TimePoint struct {
Time time.Time `json:"time"`
Expand Down
101 changes: 99 additions & 2 deletions pkg/audit/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"sort"
"sync"
"time"

"github.com/google/uuid"
)

// breakdownKeyFn picks the per-event key used by Breakdown.
Expand All @@ -16,19 +18,99 @@ import (
type MemoryLogger struct {
mu sync.Mutex
events []Event

subsMu sync.Mutex
subs []*memSubscriber
}

// memSubscriber mirrors AsyncLogger's subscriber: per-subscriber mutex
// gates send and close so a cancel doesn't race with an in-flight
// broadcast.
type memSubscriber struct {
mu sync.Mutex
ch chan Event
closed bool
}

func (s *memSubscriber) send(ev Event) {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return
}
select {
case s.ch <- ev:
default:
}
}

func (s *memSubscriber) closeOnce() {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return
}
s.closed = true
close(s.ch)
}

// NewMemoryLogger returns an empty logger.
func NewMemoryLogger() *MemoryLogger { return &MemoryLogger{} }

// Log appends the event.
// Log appends the event and broadcasts to live-tail subscribers.
// Auto-assigns ev.ID when empty, matching the Postgres store's
// behavior so test fixtures see a stable id without setting one
// explicitly.
func (m *MemoryLogger) Log(_ context.Context, ev Event) error {
if ev.ID == "" {
ev.ID = uuid.NewString()
}
m.mu.Lock()
defer m.mu.Unlock()
m.events = append(m.events, ev)
m.mu.Unlock()
m.broadcast(ev)
return nil
}

// Subscribe registers a live-tail consumer. See SubscribingLogger doc.
//
// Used by /audit/stream tests that bypass AsyncLogger and use
// MemoryLogger directly. Same buffered-channel + non-blocking-send
// semantics as AsyncLogger.Subscribe; same per-subscriber mutex
// pattern to keep cancel from racing with broadcast.
func (m *MemoryLogger) Subscribe(buf int) (<-chan Event, func()) {
if buf <= 0 {
buf = 64
}
s := &memSubscriber{ch: make(chan Event, buf)}
m.subsMu.Lock()
m.subs = append(m.subs, s)
m.subsMu.Unlock()

cancel := func() {
m.subsMu.Lock()
for i, x := range m.subs {
if x == s {
m.subs = append(m.subs[:i], m.subs[i+1:]...)
break
}
}
m.subsMu.Unlock()
s.closeOnce()
}
return s.ch, cancel
}

// broadcast sends ev to every active subscriber, non-blocking.
func (m *MemoryLogger) broadcast(ev Event) {
m.subsMu.Lock()
subs := append([]*memSubscriber{}, m.subs...)
m.subsMu.Unlock()
for _, s := range subs {
s.send(ev)
}
}

// Query returns matching events ordered by timestamp DESC. Only ToolName,
// UserID, From, To, Success, and Limit are honored; other filter fields are
// ignored. Sufficient for tests; the Postgres store covers the full filter
Expand Down Expand Up @@ -122,6 +204,21 @@ func (m *MemoryLogger) Stream(ctx context.Context, f QueryFilter, fn func(Event)
return nil
}

// GetPayload returns the in-memory event's Payload pointer, matching
// the PayloadLogger contract used by the portal detail and replay
// endpoints. Returns (nil, nil) when no event with the given id is
// stored, or when the event was logged without a Payload.
func (m *MemoryLogger) GetPayload(_ context.Context, eventID string) (*Payload, error) {
m.mu.Lock()
defer m.mu.Unlock()
for _, ev := range m.events {
if ev.ID == eventID {
return ev.Payload, nil
}
}
return nil, nil
}

// Snapshot returns a copy of all events in insertion order, for assertions.
func (m *MemoryLogger) Snapshot() []Event {
m.mu.Lock()
Expand Down
Loading
Loading