Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .github/codeql/codeql-config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# mcp-test CodeQL configuration.
#
# Referenced from .github/workflows/codeql.yml via config-file. Without
# this, CodeQL uses the default suite plus no project-specific tuning.

name: "mcp-test CodeQL"

queries:
- uses: security-and-quality

# Repository-wide query filters. Each entry must justify why a query is
# excluded; "looks scary" is not a reason. The audit logger is the only
# legitimate "Log function with potentially sensitive data" sink in this
# project, and that's by design (forensics over discretion). Adding
# any new Log-named function in this codebase MUST be reviewed against
# this exception, since the rule no longer fires globally.
query-filters:
- exclude:
id: go/clear-text-logging
# Justification: audit.Logger.Log captures full audit_events
# rows (sanitized via redact_keys) by design. CodeQL traces
# err.Error() -> Event.ErrorMessage -> *ev -> Log() and flags
# the whole chain. The error message is what an operator NEEDS
# to see during incident review; suppressing it would defeat
# the audit pipeline. gosec and semgrep still cover other
# cleartext-credential-in-log patterns at the function-call
# level (e.g. fmt.Println, log.Print*).
5 changes: 3 additions & 2 deletions .github/workflows/codeql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ jobs:
with:
languages: go
# security-and-quality bundles the security pack with style /
# correctness rules. Findings post to the repo's Security tab.
queries: security-and-quality
# correctness rules. Project-specific query exclusions live
# in the config-file; findings post to the repo's Security tab.
config-file: ./.github/codeql/codeql-config.yml

- name: Autobuild
uses: github/codeql-action/autobuild@95e58e9a2cdfd71adc6e0353d5c52f41a045d225 # v4.35.2
Expand Down
34 changes: 34 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,40 @@ govulncheck: tools-check
## security: gosec + govulncheck
security: gosec govulncheck

## codeql: Run the same CodeQL security-and-quality suite CI runs.
## Requires the codeql CLI on PATH (brew install codeql or
## download from https://github.com/github/codeql-cli-binaries).
## Heavy (~3 min on first run, ~1 min cached). Not part of
## `make verify` by default; run before opening a PR.
##
## The config file at .github/codeql/codeql-config.yml is
## the single source of truth for query exclusions; this
## target uses the same file CI does so local results match.
CODEQL_DB ?= $(BUILD_DIR)/codeql-db
CODEQL_RESULT ?= $(BUILD_DIR)/codeql-results.sarif
codeql:
@command -v codeql >/dev/null 2>&1 || { \
echo "FAIL: codeql CLI not on PATH."; \
echo " brew install codeql"; \
echo " (or fetch from https://github.com/github/codeql-cli-binaries/releases)"; \
exit 1; \
}
@echo "Building CodeQL database (Go) at $(CODEQL_DB)..."
@rm -rf $(CODEQL_DB)
@mkdir -p $(BUILD_DIR)
codeql database create $(CODEQL_DB) --language=go --source-root=. --overwrite
@echo "Analyzing with security-and-quality + project config..."
codeql database analyze $(CODEQL_DB) \
codeql/go-queries:codeql-suites/go-security-and-quality.qls \
--format=sarif-latest \
--output=$(CODEQL_RESULT) \
--threads=0 \
--sarif-category=/language:go
@echo ""
@echo "Filtering against .github/codeql/codeql-config.yml exclusions..."
@./scripts/codeql-gate.sh $(CODEQL_RESULT) .github/codeql/codeql-config.yml
@echo "CodeQL: clean."

COVERAGE_MIN ?= 80

## coverage: Run tests and produce a per-package coverage profile.
Expand Down
42 changes: 42 additions & 0 deletions docs/operations/audit.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,48 @@ JSON string, `?param.code=200` matches the number.
Allowed `has=` columns: `request_params`, `request_headers`,
`response_result`, `response_error`, `notifications`, `replayed_from`.

### Replay a captured call

`POST /api/v1/portal/audit/events/{id}/replay` re-invokes the tool with the same arguments captured on the original event, through an in-process MCP client. The replay produces a new audit row tagged `source=portal-replay` with `replayed_from = {id}`; that row is fired with the portal-authenticated identity, not the original caller's, so an operator can see who triggered the replay.

```bash
# Find a tool error from the last hour that you want to reproduce.
curl -H "X-API-Key: $KEY" \
"$BASE/api/v1/portal/audit/events?response.isError=true&from=$(date -u -v-1H +%FT%TZ)&limit=5" \
| jq -r '.events[].id'

# Replay one. The response includes the new event's id so you can
# follow up with /events/{id}.
curl -X POST -H "X-API-Key: $KEY" -H "X-Requested-With: x" \
"$BASE/api/v1/portal/audit/events/<id>/replay" | jq
```

The replay refuses (`400`) when:

- the original event has no captured payload (capture was disabled when it was written),
- any captured parameter value is the literal `[redacted]` (replaying with a placeholder would mislead about what the call did; re-stage manually via Try-It with the real value),
- the named tool is no longer registered.

A per-identity token bucket (5 burst, ~5/min sustained) protects against runaway replay loops; exhausted callers get `429 Too Many Requests` with a `Retry-After` header.

Replay re-runs the tool's side effects. If the original call wrote to a database, sent a notification, or charged a card, the replay does it again. There is no dry-run mode and no per-tool allow list; if the operator can hit `/replay`, every registered tool is replayable. Treat this like Try-It: a developer affordance for debugging, not a production self-service.

### Live tail

`GET /api/v1/portal/audit/stream` is an SSE endpoint that emits one `event: audit\ndata: <event JSON>` per newly-written audit event. Open the connection, fire calls, watch them flow:

```bash
# In one terminal:
curl -N -H "X-API-Key: $KEY" "$BASE/api/v1/portal/audit/stream"

# In another, fire some tool calls; the first terminal sees them
# arrive within ~200ms of each write.
```

The endpoint emits an opening `: connected` comment so the consumer can detect the connection is live before the first audit row arrives, and a `: keepalive` comment every 30 seconds to keep idle proxies from killing the connection. Subscribers see only events written AFTER they subscribe; for history use `/events` or `/export`.

Slow consumers drop events silently per-subscriber (the producer never blocks). The buffered channel default is 64 events; SSE clients should drain promptly to avoid drops during bursts.

### NDJSON export

`/api/v1/portal/audit/export?format=jsonl` streams summary rows as
Expand Down
2 changes: 2 additions & 0 deletions docs/reference/http-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ Behind the cookie or `X-API-Key` / `Authorization: Bearer`.
| `GET` | `/api/v1/portal/tools/{name}` | Same shape, single tool. |
| `GET` | `/api/v1/portal/audit/events` | Paginated audit events. Query: `from`, `to` (RFC 3339), `tool`, `user`, `session`, `success`, `q`, `limit`, `offset`, plus the JSONB filters described below. |
| `GET` | `/api/v1/portal/audit/events/{id}` | Single event by id (UUID); includes the captured payload row when present. 400 on a non-UUID id, 404 when the event isn't recorded. |
| `POST` | `/api/v1/portal/audit/events/{id}/replay` | Re-invokes the captured tool call through an in-process MCP client. Writes a new audit event tagged `source=portal-replay` with `replayed_from` pointing at `{id}`. Per-identity rate limited (5 burst, 1 token / 12s); returns `429 Too Many Requests` with `Retry-After` when exhausted. Refuses (`400`) if the original event has no captured payload, has redacted parameter values, or names a tool no longer registered. CSRF-gated via `X-Requested-With`. |
| `GET` | `/api/v1/portal/audit/export` | NDJSON stream of summary rows for a filter. `format=jsonl` (default) is the only supported format. Same filter surface as `/events`. Capped at 100,000 rows per request. |
| `GET` | `/api/v1/portal/audit/stream` | SSE live tail of new audit events. One `event: audit\ndata: <event JSON>` per write; opening comment `: connected` confirms the connection; `: keepalive` every 30 seconds. Sets `X-Accel-Buffering: no` for nginx-fronted deployments. |
| `GET` | `/api/v1/portal/audit/timeseries` | Bucketed counts. Query: `from`, `to`, `bucket` (Go duration). |
| `GET` | `/api/v1/portal/audit/breakdown` | Group-by aggregations. Query: `by` (`tool`/`user`/`success`/`auth_type`). |
| `GET` | `/api/v1/portal/dashboard` | 1-hour stats + recent activity. |
Expand Down
4 changes: 2 additions & 2 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func Build(ctx context.Context, cfg *config.Config, logger *slog.Logger) (*Appli
app.browser = ba
}
// Rebuild the mux with portal handlers attached.
portalAPI := httpsrv.NewPortalAPI(cfg, app.registry, auditLog)
portalAPI := httpsrv.NewPortalAPI(cfg, app.registry, auditLog, app.mcpServer, cfg.Audit.RedactKeys)
adminAPI := httpsrv.NewAdminAPI(dbStore, app.mcpServer, auditLog, app.registry, cfg.Audit.RedactKeys)
portalAuth := httpsrv.NewPortalAuth(sessions, chain)
app.mux = buildMuxWithPortal(cfg, app.mcpServer, app.readiness, app.browser, portalAPI, adminAPI, portalAuth)
Expand All @@ -143,7 +143,7 @@ func BuildWithDeps(cfg *config.Config, logger *slog.Logger, chain *auth.Chain, a
if cfg.Portal.CookieSecret != "" {
sessions, _ = httpsrv.NewSessionStore(cfg.Portal.CookieName, cfg.Portal.CookieSecret, false, time.Hour)
}
portalAPI := httpsrv.NewPortalAPI(cfg, app.registry, auditLog)
portalAPI := httpsrv.NewPortalAPI(cfg, app.registry, auditLog, app.mcpServer, cfg.Audit.RedactKeys)
adminAPI := httpsrv.NewAdminAPI(nil, app.mcpServer, auditLog, app.registry, cfg.Audit.RedactKeys)
portalAuth := httpsrv.NewPortalAuth(sessions, chain)
app.sessions = sessions
Expand Down
90 changes: 90 additions & 0 deletions pkg/audit/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,50 @@ type AsyncLogger struct {

mu sync.Mutex
dropped uint64

// Live-tail subscribers. Mutex-protected for the registry
// itself; sends to the channels are non-blocking so a slow
// consumer can't stall the drain goroutine. Drop counts per
// subscriber are intentionally NOT tracked individually; the
// global Dropped() count covers the buffered-channel-input
// drop, and sse-tail consumers are expected to handle gaps.
subsMu sync.Mutex
subs []*subscriber
}

// subscriber holds a per-consumer channel + a closed flag, both
// protected by mu so a concurrent broadcast and cancel cannot race
// on s.ch (send on closed channel panic / data race detector).
type subscriber struct {
mu sync.Mutex
ch chan Event
closed bool
}

// send attempts a non-blocking send. Caller must NOT hold s.mu.
// Returns silently when the buffer is full (drop) or the subscriber
// has been cancelled (drop).
func (s *subscriber) send(ev Event) {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return
}
select {
case s.ch <- ev:
default:
}
}

// closeOnce closes the channel exactly once. Idempotent.
func (s *subscriber) closeOnce() {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return
}
s.closed = true
close(s.ch)
}

// NewAsyncLogger returns a buffered async wrapper around inner. bufferSize
Expand Down Expand Up @@ -181,6 +225,52 @@ func (a *AsyncLogger) write(ev Event) {
defer cancel()
if err := a.inner.Log(ctx, ev); err != nil {
a.logger.Warn("audit write failed", "tool", ev.ToolName, "err", err)
return
}
// Broadcast successful writes to live-tail subscribers. Done
// after inner.Log() so subscribers only see persisted events;
// a write that errored out doesn't surface to the live tail.
a.broadcast(ev)
}

// Subscribe registers a live-tail consumer and returns the channel
// plus a cancel func. See SubscribingLogger doc for semantics.
//
// buf <= 0 falls back to a sane default (64). Slow consumers cause
// per-subscriber event drops, not producer-side blocking.
func (a *AsyncLogger) Subscribe(buf int) (<-chan Event, func()) {
if buf <= 0 {
buf = 64
}
s := &subscriber{ch: make(chan Event, buf)}
a.subsMu.Lock()
a.subs = append(a.subs, s)
a.subsMu.Unlock()

cancel := func() {
a.subsMu.Lock()
for i, x := range a.subs {
if x == s {
a.subs = append(a.subs[:i], a.subs[i+1:]...)
break
}
}
a.subsMu.Unlock()
s.closeOnce()
}
return s.ch, cancel
}

// broadcast sends ev to every active subscriber, non-blocking. A
// subscriber whose buffer is full silently drops this event. Each
// subscriber's send is gated by its own mutex so a concurrent cancel
// can't close the channel mid-send.
func (a *AsyncLogger) broadcast(ev Event) {
a.subsMu.Lock()
subs := append([]*subscriber{}, a.subs...)
a.subsMu.Unlock()
for _, s := range subs {
s.send(ev)
}
}

Expand Down
21 changes: 21 additions & 0 deletions pkg/audit/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,27 @@ type StreamingLogger interface {
// and the underlying backend delivers fewer.
const MaxQueryLimit = 1000

// SubscribingLogger is the optional capability for fan-out of newly
// written audit events to live consumers (the SSE live-tail endpoint
// is the primary use). Stores or wrappers that broadcast events on
// Log() implement it; the consumer type-asserts before subscribing.
//
// Semantics:
// - Subscribe returns a receive-only channel of events plus a
// cancel func. The caller MUST call cancel() on disconnect to
// release the slot; otherwise the registry leaks.
// - The channel is buffered with `buf` slots. When a producer
// writes faster than the consumer drains, events are dropped
// for that subscriber (the producer never blocks on a slow
// consumer). Picking buf is a tradeoff between memory and the
// drop rate; 64 is a reasonable starting point for SSE.
// - Subscribers see events that succeeded at the underlying
// backend (in AsyncLogger, the broadcast happens after
// inner.Log() returns nil). Failed writes are not surfaced.
type SubscribingLogger interface {
Subscribe(buf int) (<-chan Event, func())
}

// TimePoint is one bucket of an audit time series.
type TimePoint struct {
Time time.Time `json:"time"`
Expand Down
Loading
Loading