Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions internal/cli/contain/install_review_fixes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ func TestRenderedCCLaunch_ParsesUnderBash(t *testing.T) {
scriptPath := filepath.Join(tmp, "plk-launch")
writeScriptFixture(t, scriptPath, renderLaunchWrapper(env))
// bash -n parses without executing. Any syntax error fails the test.
cmd := exec_realCommand(t, "/bin/bash", "-n", scriptPath)
cmd := execRealCommand(t, "/bin/bash", "-n", scriptPath)
if cmd.exit != 0 {
t.Fatalf("bash -n rejected plk-launch:\n%s\n--- script ---\n%s", cmd.output, renderLaunchWrapper(env))
}
Expand Down Expand Up @@ -499,23 +499,23 @@ func TestRenderedCCLaunch_ExecutesUnderBash(t *testing.T) {
tc.mutate()
}
args := append([]string{scriptPath}, tc.argv...)
out := exec_realCommand(t, "/bin/bash", args...)
out := execRealCommand(t, "/bin/bash", args...)
if out.exit != tc.wantExit {
t.Fatalf("exit: got %d, want %d (output: %s)", out.exit, tc.wantExit, out.output)
}
})
}
}

// exec_realCommand runs cmd outside the runner abstraction. Used only for
// execRealCommand runs cmd outside the runner abstraction. Used only for
// the rendered-script smoke test above; this is the one place we need a
// real subprocess and not a fake. Kept tiny on purpose.
type cmdResult struct {
output string
exit int
}

func exec_realCommand(t *testing.T, name string, args ...string) cmdResult { //nolint:revive // test helper
func execRealCommand(t *testing.T, name string, args ...string) cmdResult {
t.Helper()
out, code, err := realRunCommand(context.Background(), name, args...)
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions internal/config/canonical.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,16 @@ func (c *Config) policySemanticView() Config {
// max_body_bytes DO affect the envelope contract and stay in view.
view.MediationEnvelope.SigningKeyPath = ""

// MCPToolPolicy.QuarantineDir is an operational filesystem path, not policy.
// Its default is derived from os.TempDir(), so leaving it in the view makes
// the hash depend on the ambient TMPDIR (the same policy resolves to a
// different path across environments), which breaks the admission-grade
// contract that identical policy yields an identical hash. Where redirected
// tool-call payloads land does not change what pipelock decides about a
// scanned request; the tool-policy enabled flag, action, and rules stay in
// view because those do affect detection.
view.MCPToolPolicy.QuarantineDir = ""

// HealthWatchdog is excluded from the canonical hash via the `json:"-"`
// tag on the Config field - operational liveness, not policy. Whether
// the watchdog is enabled or what tick interval it uses does not change
Expand Down
7 changes: 6 additions & 1 deletion internal/config/canonical_golden_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,12 @@ const (
// bypass for the query parameter entropy gate is a policy-semantic
// change. Empty by default but the field is part of the canonical view.
// Re-bumped for the file_sentry max_file_bytes field: see note above.
goldenHashDefaults = "7b648142b275680696e0d660440a4b456ba3f69e8bbd64d5c34d8050b5913711"
// Bumped when mcp_tool_policy.quarantine_dir was removed from the
// canonical view: its default is derived from os.TempDir(), which made
// this hash depend on the ambient TMPDIR. Excluding the operational path
// makes the hash environment-independent (identical policy -> identical
// hash), which is the admission-grade contract.
goldenHashDefaults = "5e449278f4437345fa3a3fea9dafa3f2b1c73e55bc028c0c35f19af999ad0d4b"

// goldenHashRichConfig pins the hash for goldenRichYAML loaded via
// config.Load, post-ApplyDefaults + Validate. Covers a broad,
Expand Down
35 changes: 35 additions & 0 deletions internal/config/canonical_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ func TestCanonicalPolicyHash_NoiseFieldsDoNotAffect(t *testing.T) {
c.MediationEnvelope.SigningKeyPath = "/etc/pipelock/envelope.key"
},
},
{
name: "mcp_tool_policy.quarantine_dir",
mut: func(c *Config) {
c.MCPToolPolicy.QuarantineDir = "/var/lib/pipelock/quarantine"
},
},
{
name: "flight_recorder.dir",
mut: func(c *Config) { c.FlightRecorder.Dir = "/var/lib/pipelock/fr" },
Expand All @@ -149,6 +155,35 @@ func TestCanonicalPolicyHash_NoiseFieldsDoNotAffect(t *testing.T) {
}
}

// TestCanonicalPolicyHash_DefaultQuarantineDirTMPDIRInvariant proves the
// canonical hash does not move when the default mcp_tool_policy.quarantine_dir
// changes solely because TMPDIR differs. The default is filepath.Join(
// os.TempDir(), "pipelock-quarantine"), so without excluding it from the
// policy view the hash would depend on the ambient environment and the same
// policy would hash differently across machines. Not parallel: it mutates the
// process environment via t.Setenv.
func TestCanonicalPolicyHash_DefaultQuarantineDirTMPDIRInvariant(t *testing.T) {
tmpA := t.TempDir()
tmpB := t.TempDir()

t.Setenv("TMPDIR", tmpA)
cfgA := Defaults()
hashA := cfgA.computeCanonicalPolicyHash()
qDirA := cfgA.MCPToolPolicy.QuarantineDir

t.Setenv("TMPDIR", tmpB)
cfgB := Defaults()
hashB := cfgB.computeCanonicalPolicyHash()
qDirB := cfgB.MCPToolPolicy.QuarantineDir

if qDirA == qDirB {
t.Fatalf("test setup failed: quarantine dirs should differ across TMPDIR, both were %q", qDirA)
}
if hashA != hashB {
t.Fatalf("CanonicalPolicyHash() changed with TMPDIR-derived quarantine_dir: %s != %s", hashA, hashB)
}
}

func TestCanonicalPolicyHash_PolicyFieldsDoAffect(t *testing.T) {
t.Parallel()

Expand Down
3 changes: 2 additions & 1 deletion internal/envelope/signer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"crypto/rand"
"crypto/sha256"
"encoding/base64"
"errors"
"net/http"
"strings"
"testing"
Expand Down Expand Up @@ -138,7 +139,7 @@ func TestSignRequest_NilSignerReturnsErrSignerDisabled(t *testing.T) {
if err == nil {
t.Fatal("nil signer should return an error")
}
if err != ErrSignerDisabled { //nolint:errorlint // sentinel comparison is intentional
if !errors.Is(err, ErrSignerDisabled) {
t.Errorf("nil signer error = %v, want ErrSignerDisabled", err)
}
}
Expand Down
37 changes: 37 additions & 0 deletions internal/proxy/adaptive_upgrade.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package proxy

import (
"github.com/luckyPipewrench/pipelock/internal/audit"
"github.com/luckyPipewrench/pipelock/internal/metrics"
)

// adaptiveUpgrade carries the inputs shared by the adaptive-enforcement audit
// log line and its matching Prometheus counter. The two emissions must always
// move together: the audit log records the per-session escalation decision and
// the counter feeds operator dashboards. Recording one without the other would
// desync the audit trail from the metrics, so they are emitted as one unit.
type adaptiveUpgrade struct {
SessionKey string
Level string
FromAction string
ToAction string
Scanner string
ClientIP string
RequestID string
}

// recordAdaptiveUpgrade emits the adaptive-upgrade audit log line and the
// matching Prometheus counter for a single enforcement upgrade. It is the one
// source of truth for that pair across every transport (fetch, forward,
// CONNECT, WebSocket, TLS intercept), so a change to the audit/metric contract
// is made in one place rather than at ~28 call sites.
//
// m may be nil: the TLS-intercept path holds its proxy-level metrics behind an
// optional proxy reference, and Metrics.RecordAdaptiveUpgrade is itself
// nil-safe, so a nil m skips the counter exactly as the prior per-site
// `if ic.Proxy != nil` guards did. logger is non-nil at every current call
// site; it is dereferenced directly to preserve the existing behavior.
func recordAdaptiveUpgrade(logger *audit.Logger, m *metrics.Metrics, u adaptiveUpgrade) {
logger.LogAdaptiveUpgrade(u.SessionKey, u.Level, u.FromAction, u.ToAction, u.Scanner, u.ClientIP, u.RequestID)
m.RecordAdaptiveUpgrade(u.FromAction, u.ToAction, u.Level)
}
77 changes: 77 additions & 0 deletions internal/proxy/adaptive_upgrade_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package proxy

import (
"testing"

"github.com/luckyPipewrench/pipelock/internal/audit"
"github.com/luckyPipewrench/pipelock/internal/config"
"github.com/luckyPipewrench/pipelock/internal/metrics"
)

// adaptiveUpgradeCounterValue reads the pipelock_adaptive_upgrades_total counter
// for a specific (from_action, to_action, level) label set out of the metrics
// registry, without reaching into unexported fields. Returns 0 when no matching
// series is present.
func adaptiveUpgradeCounterValue(t *testing.T, m *metrics.Metrics, from, to, level string) float64 {
t.Helper()
families, err := m.Registry().Gather()
if err != nil {
t.Fatalf("gather metrics: %v", err)
}
for _, fam := range families {
if fam.GetName() != "pipelock_adaptive_upgrades_total" {
continue
}
for _, mm := range fam.GetMetric() {
labels := map[string]string{}
for _, lp := range mm.GetLabel() {
labels[lp.GetName()] = lp.GetValue()
}
if labels["from_action"] == from && labels["to_action"] == to && labels["level"] == level {
return mm.GetCounter().GetValue()
}
}
}
return 0
}

// TestRecordAdaptiveUpgrade_WiresLogAndMetric proves the helper drives the
// adaptive-upgrade Prometheus counter from the From/To/Level fields of the
// struct. The audit-log emission and the metric emission are fed from the same
// adaptiveUpgrade value, so they cannot disagree on those fields by
// construction; that is the property the helper exists to guarantee.
func TestRecordAdaptiveUpgrade_WiresLogAndMetric(t *testing.T) {
m := metrics.New()
logger := audit.NewNop()

recordAdaptiveUpgrade(logger, m, adaptiveUpgrade{
SessionKey: "agent-a|10.0.0.1",
Level: "L1",
FromAction: config.ActionWarn,
ToAction: config.ActionBlock,
Scanner: "session_deny",
ClientIP: "10.0.0.1",
RequestID: "req-1",
})

if got := adaptiveUpgradeCounterValue(t, m, config.ActionWarn, config.ActionBlock, "L1"); got != 1 {
t.Errorf("adaptive upgrade counter = %v, want 1 for from=warn to=block level=L1", got)
}
}

// TestRecordAdaptiveUpgrade_NilMetricsSafe confirms the helper does not panic
// when metrics are nil, matching the TLS-intercept path where the proxy-level
// metrics handle can be absent. The audit log must still be emitted.
func TestRecordAdaptiveUpgrade_NilMetricsSafe(t *testing.T) {
logger := audit.NewNop()
// Must not panic with a nil *metrics.Metrics.
recordAdaptiveUpgrade(logger, nil, adaptiveUpgrade{
SessionKey: "10.0.0.1",
Level: "L2",
FromAction: "",
ToAction: config.ActionBlock,
Scanner: "session_deny",
ClientIP: "10.0.0.1",
RequestID: "req-2",
})
}
30 changes: 10 additions & 20 deletions internal/proxy/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,7 @@ func (p *Proxy) handleConnect(w http.ResponseWriter, r *http.Request) {
effectiveAction := decide.UpgradeAction(baseAction, sr.Level, &cfg.AdaptiveEnforcement)
if effectiveAction == config.ActionBlock {
sessionKey := sessionKeyFor(agent, clientIP)
p.logger.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(sr.Level), baseAction, effectiveAction, result.Scanner, clientIP, requestID)
p.metrics.RecordAdaptiveUpgrade(baseAction, effectiveAction, session.EscalationLabel(sr.Level))
recordAdaptiveUpgrade(p.logger, p.metrics, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(sr.Level), FromAction: baseAction, ToAction: effectiveAction, Scanner: result.Scanner, ClientIP: clientIP, RequestID: requestID})
p.logger.LogBlockedDetail(targetCtx, result.Scanner, result.Reason+" (escalated)", auditDetailFromResult(result))
p.metrics.RecordTunnelBlocked(agentLabel)
writeBlockedError(w, blockInfo(result.Scanner),
Expand All @@ -341,8 +340,7 @@ func (p *Proxy) handleConnect(w http.ResponseWriter, r *http.Request) {
// session is at an escalation level with block_all=true.
if sr.Level > 0 && decide.UpgradeAction("", sr.Level, &cfg.AdaptiveEnforcement) == config.ActionBlock {
sessionKey := sessionKeyFor(agent, clientIP)
p.logger.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(sr.Level), "", config.ActionBlock, "session_deny", clientIP, requestID)
p.metrics.RecordAdaptiveUpgrade("", config.ActionBlock, session.EscalationLabel(sr.Level))
recordAdaptiveUpgrade(p.logger, p.metrics, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(sr.Level), FromAction: "", ToAction: config.ActionBlock, Scanner: "session_deny", ClientIP: clientIP, RequestID: requestID})
p.metrics.RecordTunnelBlocked(agentLabel)
writeBlockedError(w,
blockInfoFor(blockreason.EscalationLevel, "session_deny"),
Expand Down Expand Up @@ -387,8 +385,7 @@ func (p *Proxy) handleConnect(w http.ResponseWriter, r *http.Request) {
originalCEEAction := ceeAction
ceeAction = decide.UpgradeAction(ceeAction, sr.Level, &cfg.AdaptiveEnforcement)
if ceeAction != originalCEEAction {
p.logger.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(sr.Level), originalCEEAction, ceeAction, "cross_request_entropy", clientIP, requestID)
p.metrics.RecordAdaptiveUpgrade(originalCEEAction, ceeAction, session.EscalationLabel(sr.Level))
recordAdaptiveUpgrade(p.logger, p.metrics, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(sr.Level), FromAction: originalCEEAction, ToAction: ceeAction, Scanner: "cross_request_entropy", ClientIP: clientIP, RequestID: requestID})
}
if ceeAction == config.ActionBlock {
p.logger.LogBlocked(targetCtx, "cross_request_entropy", detail)
Expand All @@ -410,8 +407,7 @@ func (p *Proxy) handleConnect(w http.ResponseWriter, r *http.Request) {
if cfg.AdaptiveEnforcement.Enabled {
if connectRec != nil {
if decide.UpgradeAction("", connectRec.EscalationLevel(), &cfg.AdaptiveEnforcement) == config.ActionBlock {
p.logger.LogAdaptiveUpgrade(connectSessionKey, session.EscalationLabel(connectRec.EscalationLevel()), "", config.ActionBlock, "session_deny", clientIP, requestID)
p.metrics.RecordAdaptiveUpgrade("", config.ActionBlock, session.EscalationLabel(connectRec.EscalationLevel()))
recordAdaptiveUpgrade(p.logger, p.metrics, adaptiveUpgrade{SessionKey: connectSessionKey, Level: session.EscalationLabel(connectRec.EscalationLevel()), FromAction: "", ToAction: config.ActionBlock, Scanner: "session_deny", ClientIP: clientIP, RequestID: requestID})
p.metrics.RecordTunnelBlocked(agentLabel)
writeBlockedError(w,
blockInfoFor(blockreason.EscalationLevel, "session_deny"),
Expand Down Expand Up @@ -922,8 +918,7 @@ func (p *Proxy) handleForwardHTTP(w http.ResponseWriter, r *http.Request) {
effectiveAction := decide.UpgradeAction(baseAction, sr.Level, &cfg.AdaptiveEnforcement)
if effectiveAction == config.ActionBlock {
sessionKey := sessionKeyFor(agent, clientIP)
p.logger.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(sr.Level), baseAction, effectiveAction, result.Scanner, clientIP, requestID)
p.metrics.RecordAdaptiveUpgrade(baseAction, effectiveAction, session.EscalationLabel(sr.Level))
recordAdaptiveUpgrade(p.logger, p.metrics, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(sr.Level), FromAction: baseAction, ToAction: effectiveAction, Scanner: result.Scanner, ClientIP: clientIP, RequestID: requestID})
p.logger.LogBlockedDetail(actx, result.Scanner, result.Reason+" (escalated)", auditDetailFromResult(result))
p.metrics.RecordBlocked(r.URL.Hostname(), result.Scanner, time.Since(start), agentLabel)
writeBlockedError(w,
Expand All @@ -946,8 +941,7 @@ func (p *Proxy) handleForwardHTTP(w http.ResponseWriter, r *http.Request) {
// session is at an escalation level with block_all=true.
if sr.Level > 0 && decide.UpgradeAction("", sr.Level, &cfg.AdaptiveEnforcement) == config.ActionBlock {
sessionKey := sessionKeyFor(agent, clientIP)
p.logger.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(sr.Level), "", config.ActionBlock, "session_deny", clientIP, requestID)
p.metrics.RecordAdaptiveUpgrade("", config.ActionBlock, session.EscalationLabel(sr.Level))
recordAdaptiveUpgrade(p.logger, p.metrics, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(sr.Level), FromAction: "", ToAction: config.ActionBlock, Scanner: "session_deny", ClientIP: clientIP, RequestID: requestID})
p.metrics.RecordBlocked(r.URL.Hostname(), "session_deny", time.Since(start), agentLabel)
writeBlockedError(w,
blockInfoFor(blockreason.EscalationLevel, "session_deny"),
Expand Down Expand Up @@ -1213,8 +1207,7 @@ func (p *Proxy) handleForwardHTTP(w http.ResponseWriter, r *http.Request) {
}
if action != originalBodyAction {
sessionKey := sessionKeyFor(agent, clientIP)
p.logger.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(sr.Level), originalBodyAction, action, scannerLabel, clientIP, requestID)
p.metrics.RecordAdaptiveUpgrade(originalBodyAction, action, session.EscalationLabel(sr.Level))
recordAdaptiveUpgrade(p.logger, p.metrics, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(sr.Level), FromAction: originalBodyAction, ToAction: action, Scanner: scannerLabel, ClientIP: clientIP, RequestID: requestID})
}

if action == config.ActionBlock && cfg.EnforceEnabled() {
Expand Down Expand Up @@ -1347,8 +1340,7 @@ func (p *Proxy) handleForwardHTTP(w http.ResponseWriter, r *http.Request) {
if forwardHeaderHadFinding && cfg.AdaptiveEnforcement.Enabled {
if forwardRec != nil {
if decide.UpgradeAction("", forwardRec.EscalationLevel(), &cfg.AdaptiveEnforcement) == config.ActionBlock {
p.logger.LogAdaptiveUpgrade(forwardSessionKey, session.EscalationLabel(forwardRec.EscalationLevel()), "", config.ActionBlock, "session_deny", clientIP, requestID)
p.metrics.RecordAdaptiveUpgrade("", config.ActionBlock, session.EscalationLabel(forwardRec.EscalationLevel()))
recordAdaptiveUpgrade(p.logger, p.metrics, adaptiveUpgrade{SessionKey: forwardSessionKey, Level: session.EscalationLabel(forwardRec.EscalationLevel()), FromAction: "", ToAction: config.ActionBlock, Scanner: "session_deny", ClientIP: clientIP, RequestID: requestID})
writeBlockedError(w,
blockInfoFor(blockreason.EscalationLevel, "session_deny"),
"blocked: session escalation level "+session.EscalationLabel(forwardRec.EscalationLevel()), http.StatusForbidden)
Expand Down Expand Up @@ -1406,8 +1398,7 @@ func (p *Proxy) handleForwardHTTP(w http.ResponseWriter, r *http.Request) {
// live recorder so mid-request escalations are reflected immediately.
fwdRec := sm.GetOrCreate(sessionKey)
if decide.UpgradeAction("", fwdRec.EscalationLevel(), &cfg.AdaptiveEnforcement) == config.ActionBlock {
p.logger.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(fwdRec.EscalationLevel()), "", config.ActionBlock, "session_deny", clientIP, requestID)
p.metrics.RecordAdaptiveUpgrade("", config.ActionBlock, session.EscalationLabel(fwdRec.EscalationLevel()))
recordAdaptiveUpgrade(p.logger, p.metrics, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(fwdRec.EscalationLevel()), FromAction: "", ToAction: config.ActionBlock, Scanner: "session_deny", ClientIP: clientIP, RequestID: requestID})
p.metrics.RecordBlocked(r.URL.Hostname(), "session_deny", time.Since(start), agentLabel)
writeBlockedError(w,
blockInfoFor(blockreason.EscalationLevel, "session_deny"),
Expand Down Expand Up @@ -2119,8 +2110,7 @@ func (p *Proxy) handleForwardHTTP(w http.ResponseWriter, r *http.Request) {
action = decide.UpgradeAction(action, forwardRec.EscalationLevel(), &cfg.AdaptiveEnforcement)
if action != originalAction {
sessionKey := sessionKeyFor(agent, clientIP)
p.logger.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(forwardRec.EscalationLevel()), originalAction, action, "response_scan", clientIP, requestID)
p.metrics.RecordAdaptiveUpgrade(originalAction, action, session.EscalationLabel(forwardRec.EscalationLevel()))
recordAdaptiveUpgrade(p.logger, p.metrics, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(forwardRec.EscalationLevel()), FromAction: originalAction, ToAction: action, Scanner: "response_scan", ClientIP: clientIP, RequestID: requestID})
}
}

Expand Down
Loading
Loading