From 475aca030d08c0898b70669cece6f41c86e1f12a Mon Sep 17 00:00:00 2001 From: luckyPipewrench Date: Sun, 31 May 2026 10:29:58 -0400 Subject: [PATCH 1/2] refactor(proxy): consolidate adaptive-upgrade log+metric pair; fix TMPDIR-dependent policy hash Adaptive-enforcement upgrades emitted an audit log line (LogAdaptiveUpgrade) immediately followed by a Prometheus counter (RecordAdaptiveUpgrade) at 28 sites across the fetch, forward, CONNECT, WebSocket, and TLS-intercept paths. Each site recomputed the escalation label and repeated the from/to actions for both calls, so the log and metric could drift apart in a future edit. Route all 28 sites through one recordAdaptiveUpgrade helper fed by a single adaptiveUpgrade value, making log/metric consistency structural. The metrics handle is nil-safe so the TLS-intercept optional-proxy path keeps its behavior. Adds a parity test for the wiring and the nil-metrics case. No behavioral change to deny paths. Also exclude mcp_tool_policy.quarantine_dir from the canonical policy hash. Its default is derived from os.TempDir(), so it made the hash depend on the ambient TMPDIR: identical policy produced different hashes across environments, which violates the admission-grade contract and made the golden-defaults hash test fail anywhere TMPDIR != /tmp. The quarantine path is operational, not policy. Golden defaults hash bumped accordingly; the rich-config golden is unchanged because that fixture pins its own quarantine_dir. Plus three lint-hygiene fixes from the tech-debt sweep: extract repeated test literals to consts (drops two nolint:goconst), use errors.Is for a sentinel comparison (drops nolint:errorlint), rename a snake_case test helper (drops nolint:revive). --- .../cli/contain/install_review_fixes_test.go | 8 +- internal/config/canonical.go | 10 +++ internal/config/canonical_golden_test.go | 7 +- internal/config/canonical_test.go | 35 +++++++++ internal/envelope/signer_test.go | 3 +- internal/proxy/adaptive_upgrade.go | 37 +++++++++ internal/proxy/adaptive_upgrade_test.go | 77 +++++++++++++++++++ internal/proxy/forward.go | 30 +++----- internal/proxy/intercept.go | 20 +++-- internal/proxy/proxy.go | 15 ++-- internal/proxy/websocket.go | 27 +++---- internal/sentry/scrub_test.go | 27 ++++--- 12 files changed, 223 insertions(+), 73 deletions(-) create mode 100644 internal/proxy/adaptive_upgrade.go create mode 100644 internal/proxy/adaptive_upgrade_test.go diff --git a/internal/cli/contain/install_review_fixes_test.go b/internal/cli/contain/install_review_fixes_test.go index 90e7ffd3..fdb03971 100644 --- a/internal/cli/contain/install_review_fixes_test.go +++ b/internal/cli/contain/install_review_fixes_test.go @@ -439,7 +439,7 @@ func TestRenderedCCLaunch_ParsesUnderBash(t *testing.T) { scriptPath := filepath.Join(tmp, "plk-launch") writeScriptFixture(t, scriptPath, renderLaunchWrapper(env)) // bash -n parses without executing. Any syntax error fails the test. - cmd := exec_realCommand(t, "/bin/bash", "-n", scriptPath) + cmd := execRealCommand(t, "/bin/bash", "-n", scriptPath) if cmd.exit != 0 { t.Fatalf("bash -n rejected plk-launch:\n%s\n--- script ---\n%s", cmd.output, renderLaunchWrapper(env)) } @@ -499,7 +499,7 @@ func TestRenderedCCLaunch_ExecutesUnderBash(t *testing.T) { tc.mutate() } args := append([]string{scriptPath}, tc.argv...) - out := exec_realCommand(t, "/bin/bash", args...) + out := execRealCommand(t, "/bin/bash", args...) if out.exit != tc.wantExit { t.Fatalf("exit: got %d, want %d (output: %s)", out.exit, tc.wantExit, out.output) } @@ -507,7 +507,7 @@ func TestRenderedCCLaunch_ExecutesUnderBash(t *testing.T) { } } -// exec_realCommand runs cmd outside the runner abstraction. Used only for +// execRealCommand runs cmd outside the runner abstraction. Used only for // the rendered-script smoke test above; this is the one place we need a // real subprocess and not a fake. Kept tiny on purpose. type cmdResult struct { @@ -515,7 +515,7 @@ type cmdResult struct { exit int } -func exec_realCommand(t *testing.T, name string, args ...string) cmdResult { //nolint:revive // test helper +func execRealCommand(t *testing.T, name string, args ...string) cmdResult { t.Helper() out, code, err := realRunCommand(context.Background(), name, args...) if err != nil { diff --git a/internal/config/canonical.go b/internal/config/canonical.go index e9ec862d..ba7114e9 100644 --- a/internal/config/canonical.go +++ b/internal/config/canonical.go @@ -170,6 +170,16 @@ func (c *Config) policySemanticView() Config { // max_body_bytes DO affect the envelope contract and stay in view. view.MediationEnvelope.SigningKeyPath = "" + // MCPToolPolicy.QuarantineDir is an operational filesystem path, not policy. + // Its default is derived from os.TempDir(), so leaving it in the view makes + // the hash depend on the ambient TMPDIR (the same policy resolves to a + // different path across environments), which breaks the admission-grade + // contract that identical policy yields an identical hash. Where redirected + // tool-call payloads land does not change what pipelock decides about a + // scanned request; the tool-policy enabled flag, action, and rules stay in + // view because those do affect detection. + view.MCPToolPolicy.QuarantineDir = "" + // HealthWatchdog is excluded from the canonical hash via the `json:"-"` // tag on the Config field - operational liveness, not policy. Whether // the watchdog is enabled or what tick interval it uses does not change diff --git a/internal/config/canonical_golden_test.go b/internal/config/canonical_golden_test.go index 8d774277..69ae5fcc 100644 --- a/internal/config/canonical_golden_test.go +++ b/internal/config/canonical_golden_test.go @@ -131,7 +131,12 @@ const ( // bypass for the query parameter entropy gate is a policy-semantic // change. Empty by default but the field is part of the canonical view. // Re-bumped for the file_sentry max_file_bytes field: see note above. - goldenHashDefaults = "7b648142b275680696e0d660440a4b456ba3f69e8bbd64d5c34d8050b5913711" + // Bumped when mcp_tool_policy.quarantine_dir was removed from the + // canonical view: its default is derived from os.TempDir(), which made + // this hash depend on the ambient TMPDIR. Excluding the operational path + // makes the hash environment-independent (identical policy -> identical + // hash), which is the admission-grade contract. + goldenHashDefaults = "5e449278f4437345fa3a3fea9dafa3f2b1c73e55bc028c0c35f19af999ad0d4b" // goldenHashRichConfig pins the hash for goldenRichYAML loaded via // config.Load, post-ApplyDefaults + Validate. Covers a broad, diff --git a/internal/config/canonical_test.go b/internal/config/canonical_test.go index f6754b74..0ff2d3d6 100644 --- a/internal/config/canonical_test.go +++ b/internal/config/canonical_test.go @@ -125,6 +125,12 @@ func TestCanonicalPolicyHash_NoiseFieldsDoNotAffect(t *testing.T) { c.MediationEnvelope.SigningKeyPath = "/etc/pipelock/envelope.key" }, }, + { + name: "mcp_tool_policy.quarantine_dir", + mut: func(c *Config) { + c.MCPToolPolicy.QuarantineDir = "/var/lib/pipelock/quarantine" + }, + }, { name: "flight_recorder.dir", mut: func(c *Config) { c.FlightRecorder.Dir = "/var/lib/pipelock/fr" }, @@ -149,6 +155,35 @@ func TestCanonicalPolicyHash_NoiseFieldsDoNotAffect(t *testing.T) { } } +// TestCanonicalPolicyHash_DefaultQuarantineDirTMPDIRInvariant proves the +// canonical hash does not move when the default mcp_tool_policy.quarantine_dir +// changes solely because TMPDIR differs. The default is filepath.Join( +// os.TempDir(), "pipelock-quarantine"), so without excluding it from the +// policy view the hash would depend on the ambient environment and the same +// policy would hash differently across machines. Not parallel: it mutates the +// process environment via t.Setenv. +func TestCanonicalPolicyHash_DefaultQuarantineDirTMPDIRInvariant(t *testing.T) { + tmpA := t.TempDir() + tmpB := t.TempDir() + + t.Setenv("TMPDIR", tmpA) + cfgA := Defaults() + hashA := cfgA.computeCanonicalPolicyHash() + qDirA := cfgA.MCPToolPolicy.QuarantineDir + + t.Setenv("TMPDIR", tmpB) + cfgB := Defaults() + hashB := cfgB.computeCanonicalPolicyHash() + qDirB := cfgB.MCPToolPolicy.QuarantineDir + + if qDirA == qDirB { + t.Fatalf("test setup failed: quarantine dirs should differ across TMPDIR, both were %q", qDirA) + } + if hashA != hashB { + t.Fatalf("CanonicalPolicyHash() changed with TMPDIR-derived quarantine_dir: %s != %s", hashA, hashB) + } +} + func TestCanonicalPolicyHash_PolicyFieldsDoAffect(t *testing.T) { t.Parallel() diff --git a/internal/envelope/signer_test.go b/internal/envelope/signer_test.go index f63a9481..a365fa12 100644 --- a/internal/envelope/signer_test.go +++ b/internal/envelope/signer_test.go @@ -9,6 +9,7 @@ import ( "crypto/rand" "crypto/sha256" "encoding/base64" + "errors" "net/http" "strings" "testing" @@ -138,7 +139,7 @@ func TestSignRequest_NilSignerReturnsErrSignerDisabled(t *testing.T) { if err == nil { t.Fatal("nil signer should return an error") } - if err != ErrSignerDisabled { //nolint:errorlint // sentinel comparison is intentional + if !errors.Is(err, ErrSignerDisabled) { t.Errorf("nil signer error = %v, want ErrSignerDisabled", err) } } diff --git a/internal/proxy/adaptive_upgrade.go b/internal/proxy/adaptive_upgrade.go new file mode 100644 index 00000000..d493a076 --- /dev/null +++ b/internal/proxy/adaptive_upgrade.go @@ -0,0 +1,37 @@ +package proxy + +import ( + "github.com/luckyPipewrench/pipelock/internal/audit" + "github.com/luckyPipewrench/pipelock/internal/metrics" +) + +// adaptiveUpgrade carries the inputs shared by the adaptive-enforcement audit +// log line and its matching Prometheus counter. The two emissions must always +// move together: the audit log records the per-session escalation decision and +// the counter feeds operator dashboards. Recording one without the other would +// desync the audit trail from the metrics, so they are emitted as one unit. +type adaptiveUpgrade struct { + SessionKey string + Level string + FromAction string + ToAction string + Scanner string + ClientIP string + RequestID string +} + +// recordAdaptiveUpgrade emits the adaptive-upgrade audit log line and the +// matching Prometheus counter for a single enforcement upgrade. It is the one +// source of truth for that pair across every transport (fetch, forward, +// CONNECT, WebSocket, TLS intercept), so a change to the audit/metric contract +// is made in one place rather than at ~28 call sites. +// +// m may be nil: the TLS-intercept path holds its proxy-level metrics behind an +// optional proxy reference, and Metrics.RecordAdaptiveUpgrade is itself +// nil-safe, so a nil m skips the counter exactly as the prior per-site +// `if ic.Proxy != nil` guards did. logger is non-nil at every current call +// site; it is dereferenced directly to preserve the existing behavior. +func recordAdaptiveUpgrade(logger *audit.Logger, m *metrics.Metrics, u adaptiveUpgrade) { + logger.LogAdaptiveUpgrade(u.SessionKey, u.Level, u.FromAction, u.ToAction, u.Scanner, u.ClientIP, u.RequestID) + m.RecordAdaptiveUpgrade(u.FromAction, u.ToAction, u.Level) +} diff --git a/internal/proxy/adaptive_upgrade_test.go b/internal/proxy/adaptive_upgrade_test.go new file mode 100644 index 00000000..a67e902d --- /dev/null +++ b/internal/proxy/adaptive_upgrade_test.go @@ -0,0 +1,77 @@ +package proxy + +import ( + "testing" + + "github.com/luckyPipewrench/pipelock/internal/audit" + "github.com/luckyPipewrench/pipelock/internal/config" + "github.com/luckyPipewrench/pipelock/internal/metrics" +) + +// adaptiveUpgradeCounterValue reads the pipelock_adaptive_upgrades_total counter +// for a specific (from_action, to_action, level) label set out of the metrics +// registry, without reaching into unexported fields. Returns 0 when no matching +// series is present. +func adaptiveUpgradeCounterValue(t *testing.T, m *metrics.Metrics, from, to, level string) float64 { + t.Helper() + families, err := m.Registry().Gather() + if err != nil { + t.Fatalf("gather metrics: %v", err) + } + for _, fam := range families { + if fam.GetName() != "pipelock_adaptive_upgrades_total" { + continue + } + for _, mm := range fam.GetMetric() { + labels := map[string]string{} + for _, lp := range mm.GetLabel() { + labels[lp.GetName()] = lp.GetValue() + } + if labels["from_action"] == from && labels["to_action"] == to && labels["level"] == level { + return mm.GetCounter().GetValue() + } + } + } + return 0 +} + +// TestRecordAdaptiveUpgrade_WiresLogAndMetric proves the helper drives the +// adaptive-upgrade Prometheus counter from the From/To/Level fields of the +// struct. The audit-log emission and the metric emission are fed from the same +// adaptiveUpgrade value, so they cannot disagree on those fields by +// construction; that is the property the helper exists to guarantee. +func TestRecordAdaptiveUpgrade_WiresLogAndMetric(t *testing.T) { + m := metrics.New() + logger := audit.NewNop() + + recordAdaptiveUpgrade(logger, m, adaptiveUpgrade{ + SessionKey: "agent-a|10.0.0.1", + Level: "L1", + FromAction: config.ActionWarn, + ToAction: config.ActionBlock, + Scanner: "session_deny", + ClientIP: "10.0.0.1", + RequestID: "req-1", + }) + + if got := adaptiveUpgradeCounterValue(t, m, config.ActionWarn, config.ActionBlock, "L1"); got != 1 { + t.Errorf("adaptive upgrade counter = %v, want 1 for from=warn to=block level=L1", got) + } +} + +// TestRecordAdaptiveUpgrade_NilMetricsSafe confirms the helper does not panic +// when metrics are nil, matching the TLS-intercept path where the proxy-level +// metrics handle can be absent. The audit log must still be emitted. +func TestRecordAdaptiveUpgrade_NilMetricsSafe(t *testing.T) { + logger := audit.NewNop() + // Must not panic with a nil *metrics.Metrics. + recordAdaptiveUpgrade(logger, nil, adaptiveUpgrade{ + SessionKey: "10.0.0.1", + Level: "L2", + FromAction: "", + ToAction: config.ActionBlock, + Scanner: "session_deny", + ClientIP: "10.0.0.1", + RequestID: "req-2", + }) +} diff --git a/internal/proxy/forward.go b/internal/proxy/forward.go index 7ec941a6..8930f096 100644 --- a/internal/proxy/forward.go +++ b/internal/proxy/forward.go @@ -318,8 +318,7 @@ func (p *Proxy) handleConnect(w http.ResponseWriter, r *http.Request) { effectiveAction := decide.UpgradeAction(baseAction, sr.Level, &cfg.AdaptiveEnforcement) if effectiveAction == config.ActionBlock { sessionKey := sessionKeyFor(agent, clientIP) - p.logger.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(sr.Level), baseAction, effectiveAction, result.Scanner, clientIP, requestID) - p.metrics.RecordAdaptiveUpgrade(baseAction, effectiveAction, session.EscalationLabel(sr.Level)) + recordAdaptiveUpgrade(p.logger, p.metrics, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(sr.Level), FromAction: baseAction, ToAction: effectiveAction, Scanner: result.Scanner, ClientIP: clientIP, RequestID: requestID}) p.logger.LogBlockedDetail(targetCtx, result.Scanner, result.Reason+" (escalated)", auditDetailFromResult(result)) p.metrics.RecordTunnelBlocked(agentLabel) writeBlockedError(w, blockInfo(result.Scanner), @@ -341,8 +340,7 @@ func (p *Proxy) handleConnect(w http.ResponseWriter, r *http.Request) { // session is at an escalation level with block_all=true. if sr.Level > 0 && decide.UpgradeAction("", sr.Level, &cfg.AdaptiveEnforcement) == config.ActionBlock { sessionKey := sessionKeyFor(agent, clientIP) - p.logger.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(sr.Level), "", config.ActionBlock, "session_deny", clientIP, requestID) - p.metrics.RecordAdaptiveUpgrade("", config.ActionBlock, session.EscalationLabel(sr.Level)) + recordAdaptiveUpgrade(p.logger, p.metrics, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(sr.Level), FromAction: "", ToAction: config.ActionBlock, Scanner: "session_deny", ClientIP: clientIP, RequestID: requestID}) p.metrics.RecordTunnelBlocked(agentLabel) writeBlockedError(w, blockInfoFor(blockreason.EscalationLevel, "session_deny"), @@ -387,8 +385,7 @@ func (p *Proxy) handleConnect(w http.ResponseWriter, r *http.Request) { originalCEEAction := ceeAction ceeAction = decide.UpgradeAction(ceeAction, sr.Level, &cfg.AdaptiveEnforcement) if ceeAction != originalCEEAction { - p.logger.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(sr.Level), originalCEEAction, ceeAction, "cross_request_entropy", clientIP, requestID) - p.metrics.RecordAdaptiveUpgrade(originalCEEAction, ceeAction, session.EscalationLabel(sr.Level)) + recordAdaptiveUpgrade(p.logger, p.metrics, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(sr.Level), FromAction: originalCEEAction, ToAction: ceeAction, Scanner: "cross_request_entropy", ClientIP: clientIP, RequestID: requestID}) } if ceeAction == config.ActionBlock { p.logger.LogBlocked(targetCtx, "cross_request_entropy", detail) @@ -410,8 +407,7 @@ func (p *Proxy) handleConnect(w http.ResponseWriter, r *http.Request) { if cfg.AdaptiveEnforcement.Enabled { if connectRec != nil { if decide.UpgradeAction("", connectRec.EscalationLevel(), &cfg.AdaptiveEnforcement) == config.ActionBlock { - p.logger.LogAdaptiveUpgrade(connectSessionKey, session.EscalationLabel(connectRec.EscalationLevel()), "", config.ActionBlock, "session_deny", clientIP, requestID) - p.metrics.RecordAdaptiveUpgrade("", config.ActionBlock, session.EscalationLabel(connectRec.EscalationLevel())) + recordAdaptiveUpgrade(p.logger, p.metrics, adaptiveUpgrade{SessionKey: connectSessionKey, Level: session.EscalationLabel(connectRec.EscalationLevel()), FromAction: "", ToAction: config.ActionBlock, Scanner: "session_deny", ClientIP: clientIP, RequestID: requestID}) p.metrics.RecordTunnelBlocked(agentLabel) writeBlockedError(w, blockInfoFor(blockreason.EscalationLevel, "session_deny"), @@ -922,8 +918,7 @@ func (p *Proxy) handleForwardHTTP(w http.ResponseWriter, r *http.Request) { effectiveAction := decide.UpgradeAction(baseAction, sr.Level, &cfg.AdaptiveEnforcement) if effectiveAction == config.ActionBlock { sessionKey := sessionKeyFor(agent, clientIP) - p.logger.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(sr.Level), baseAction, effectiveAction, result.Scanner, clientIP, requestID) - p.metrics.RecordAdaptiveUpgrade(baseAction, effectiveAction, session.EscalationLabel(sr.Level)) + recordAdaptiveUpgrade(p.logger, p.metrics, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(sr.Level), FromAction: baseAction, ToAction: effectiveAction, Scanner: result.Scanner, ClientIP: clientIP, RequestID: requestID}) p.logger.LogBlockedDetail(actx, result.Scanner, result.Reason+" (escalated)", auditDetailFromResult(result)) p.metrics.RecordBlocked(r.URL.Hostname(), result.Scanner, time.Since(start), agentLabel) writeBlockedError(w, @@ -946,8 +941,7 @@ func (p *Proxy) handleForwardHTTP(w http.ResponseWriter, r *http.Request) { // session is at an escalation level with block_all=true. if sr.Level > 0 && decide.UpgradeAction("", sr.Level, &cfg.AdaptiveEnforcement) == config.ActionBlock { sessionKey := sessionKeyFor(agent, clientIP) - p.logger.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(sr.Level), "", config.ActionBlock, "session_deny", clientIP, requestID) - p.metrics.RecordAdaptiveUpgrade("", config.ActionBlock, session.EscalationLabel(sr.Level)) + recordAdaptiveUpgrade(p.logger, p.metrics, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(sr.Level), FromAction: "", ToAction: config.ActionBlock, Scanner: "session_deny", ClientIP: clientIP, RequestID: requestID}) p.metrics.RecordBlocked(r.URL.Hostname(), "session_deny", time.Since(start), agentLabel) writeBlockedError(w, blockInfoFor(blockreason.EscalationLevel, "session_deny"), @@ -1213,8 +1207,7 @@ func (p *Proxy) handleForwardHTTP(w http.ResponseWriter, r *http.Request) { } if action != originalBodyAction { sessionKey := sessionKeyFor(agent, clientIP) - p.logger.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(sr.Level), originalBodyAction, action, scannerLabel, clientIP, requestID) - p.metrics.RecordAdaptiveUpgrade(originalBodyAction, action, session.EscalationLabel(sr.Level)) + recordAdaptiveUpgrade(p.logger, p.metrics, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(sr.Level), FromAction: originalBodyAction, ToAction: action, Scanner: scannerLabel, ClientIP: clientIP, RequestID: requestID}) } if action == config.ActionBlock && cfg.EnforceEnabled() { @@ -1347,8 +1340,7 @@ func (p *Proxy) handleForwardHTTP(w http.ResponseWriter, r *http.Request) { if forwardHeaderHadFinding && cfg.AdaptiveEnforcement.Enabled { if forwardRec != nil { if decide.UpgradeAction("", forwardRec.EscalationLevel(), &cfg.AdaptiveEnforcement) == config.ActionBlock { - p.logger.LogAdaptiveUpgrade(forwardSessionKey, session.EscalationLabel(forwardRec.EscalationLevel()), "", config.ActionBlock, "session_deny", clientIP, requestID) - p.metrics.RecordAdaptiveUpgrade("", config.ActionBlock, session.EscalationLabel(forwardRec.EscalationLevel())) + recordAdaptiveUpgrade(p.logger, p.metrics, adaptiveUpgrade{SessionKey: forwardSessionKey, Level: session.EscalationLabel(forwardRec.EscalationLevel()), FromAction: "", ToAction: config.ActionBlock, Scanner: "session_deny", ClientIP: clientIP, RequestID: requestID}) writeBlockedError(w, blockInfoFor(blockreason.EscalationLevel, "session_deny"), "blocked: session escalation level "+session.EscalationLabel(forwardRec.EscalationLevel()), http.StatusForbidden) @@ -1406,8 +1398,7 @@ func (p *Proxy) handleForwardHTTP(w http.ResponseWriter, r *http.Request) { // live recorder so mid-request escalations are reflected immediately. fwdRec := sm.GetOrCreate(sessionKey) if decide.UpgradeAction("", fwdRec.EscalationLevel(), &cfg.AdaptiveEnforcement) == config.ActionBlock { - p.logger.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(fwdRec.EscalationLevel()), "", config.ActionBlock, "session_deny", clientIP, requestID) - p.metrics.RecordAdaptiveUpgrade("", config.ActionBlock, session.EscalationLabel(fwdRec.EscalationLevel())) + recordAdaptiveUpgrade(p.logger, p.metrics, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(fwdRec.EscalationLevel()), FromAction: "", ToAction: config.ActionBlock, Scanner: "session_deny", ClientIP: clientIP, RequestID: requestID}) p.metrics.RecordBlocked(r.URL.Hostname(), "session_deny", time.Since(start), agentLabel) writeBlockedError(w, blockInfoFor(blockreason.EscalationLevel, "session_deny"), @@ -2119,8 +2110,7 @@ func (p *Proxy) handleForwardHTTP(w http.ResponseWriter, r *http.Request) { action = decide.UpgradeAction(action, forwardRec.EscalationLevel(), &cfg.AdaptiveEnforcement) if action != originalAction { sessionKey := sessionKeyFor(agent, clientIP) - p.logger.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(forwardRec.EscalationLevel()), originalAction, action, "response_scan", clientIP, requestID) - p.metrics.RecordAdaptiveUpgrade(originalAction, action, session.EscalationLabel(forwardRec.EscalationLevel())) + recordAdaptiveUpgrade(p.logger, p.metrics, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(forwardRec.EscalationLevel()), FromAction: originalAction, ToAction: action, Scanner: "response_scan", ClientIP: clientIP, RequestID: requestID}) } } diff --git a/internal/proxy/intercept.go b/internal/proxy/intercept.go index d49a29fb..aafcb721 100644 --- a/internal/proxy/intercept.go +++ b/internal/proxy/intercept.go @@ -604,10 +604,11 @@ func newInterceptHandler( effectiveAction := decide.UpgradeAction(baseAction, recEscalationLevel(ic.Recorder), &ic.Config.AdaptiveEnforcement) if effectiveAction == config.ActionBlock { sessionKey := sessionKeyFor(ic.Agent, ic.ClientIP) - ic.Logger.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(recEscalationLevel(ic.Recorder)), baseAction, effectiveAction, urlResult.Scanner, ic.ClientIP, ic.RequestID) + var m *metrics.Metrics if ic.Proxy != nil { - ic.Proxy.metrics.RecordAdaptiveUpgrade(baseAction, effectiveAction, session.EscalationLabel(recEscalationLevel(ic.Recorder))) + m = ic.Proxy.metrics } + recordAdaptiveUpgrade(ic.Logger, m, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(recEscalationLevel(ic.Recorder)), FromAction: baseAction, ToAction: effectiveAction, Scanner: urlResult.Scanner, ClientIP: ic.ClientIP, RequestID: ic.RequestID}) switch { case urlResult.IsInfrastructureError(): // Score-neutral: see scan path above for rationale. @@ -821,10 +822,11 @@ func newInterceptHandler( } if action != originalBodyAction { sessionKey := sessionKeyFor(ic.Agent, ic.ClientIP) - ic.Logger.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(recEscalationLevel(ic.Recorder)), originalBodyAction, action, scannerLabel, ic.ClientIP, ic.RequestID) + var m *metrics.Metrics if ic.Proxy != nil { - ic.Proxy.metrics.RecordAdaptiveUpgrade(originalBodyAction, action, session.EscalationLabel(recEscalationLevel(ic.Recorder))) + m = ic.Proxy.metrics } + recordAdaptiveUpgrade(ic.Logger, m, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(recEscalationLevel(ic.Recorder)), FromAction: originalBodyAction, ToAction: action, Scanner: scannerLabel, ClientIP: ic.ClientIP, RequestID: ic.RequestID}) } // Fail-closed transport errors (consumed-but-unreplayable body) @@ -1103,10 +1105,11 @@ func newInterceptHandler( // session is at an escalation level with block_all=true. if ic.Recorder != nil && decide.UpgradeAction("", recEscalationLevel(ic.Recorder), &ic.Config.AdaptiveEnforcement) == config.ActionBlock { sessionKey := sessionKeyFor(ic.Agent, ic.ClientIP) - ic.Logger.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(recEscalationLevel(ic.Recorder)), "", config.ActionBlock, "session_deny", ic.ClientIP, ic.RequestID) + var m *metrics.Metrics if ic.Proxy != nil { - ic.Proxy.metrics.RecordAdaptiveUpgrade("", config.ActionBlock, session.EscalationLabel(recEscalationLevel(ic.Recorder))) + m = ic.Proxy.metrics } + recordAdaptiveUpgrade(ic.Logger, m, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(recEscalationLevel(ic.Recorder)), FromAction: "", ToAction: config.ActionBlock, Scanner: "session_deny", ClientIP: ic.ClientIP, RequestID: ic.RequestID}) ic.Metrics.RecordTLSRequestBlocked("session_deny") interceptEmitReceipt(ic, withInterceptRedaction(receipt.EmitOpts{ ActionID: actionID, @@ -1709,10 +1712,11 @@ func newInterceptHandler( } if action != originalAction { sessionKey := sessionKeyFor(ic.Agent, ic.ClientIP) - ic.Logger.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(recEscalationLevel(ic.Recorder)), originalAction, action, "response_scan", ic.ClientIP, ic.RequestID) + var m *metrics.Metrics if ic.Proxy != nil { - ic.Proxy.metrics.RecordAdaptiveUpgrade(originalAction, action, session.EscalationLabel(recEscalationLevel(ic.Recorder))) + m = ic.Proxy.metrics } + recordAdaptiveUpgrade(ic.Logger, m, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(recEscalationLevel(ic.Recorder)), FromAction: originalAction, ToAction: action, Scanner: "response_scan", ClientIP: ic.ClientIP, RequestID: ic.RequestID}) } patternNames := make([]string, len(scanResult.Matches)) for i, match := range scanResult.Matches { diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 716a2e41..663b5d4e 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -3213,8 +3213,7 @@ func (p *Proxy) handleFetch(w http.ResponseWriter, r *http.Request) { effectiveAction := decide.UpgradeAction(baseAction, sr.Level, &cfg.AdaptiveEnforcement) if effectiveAction == config.ActionBlock { sessionKey := sessionKeyFor(agent, clientIP) - log.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(sr.Level), baseAction, effectiveAction, result.Scanner, clientIP, requestID) - p.metrics.RecordAdaptiveUpgrade(baseAction, effectiveAction, session.EscalationLabel(sr.Level)) + recordAdaptiveUpgrade(log, p.metrics, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(sr.Level), FromAction: baseAction, ToAction: effectiveAction, Scanner: result.Scanner, ClientIP: clientIP, RequestID: requestID}) log.LogBlockedDetail(actx, result.Scanner, result.Reason+" (escalated)", auditDetailFromResult(result)) p.metrics.RecordBlocked(parsed.Hostname(), result.Scanner, time.Since(start), agentLabel) p.emitReceipt(receipt.EmitOpts{ @@ -3295,8 +3294,7 @@ func (p *Proxy) handleFetch(w http.ResponseWriter, r *http.Request) { // with an empty base action returns "block" only when block_all is set. if sr.Level > 0 && decide.UpgradeAction("", sr.Level, &cfg.AdaptiveEnforcement) == config.ActionBlock { sessionKey := sessionKeyFor(agent, clientIP) - log.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(sr.Level), "", config.ActionBlock, "session_deny", clientIP, requestID) - p.metrics.RecordAdaptiveUpgrade("", config.ActionBlock, session.EscalationLabel(sr.Level)) + recordAdaptiveUpgrade(log, p.metrics, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(sr.Level), FromAction: "", ToAction: config.ActionBlock, Scanner: "session_deny", ClientIP: clientIP, RequestID: requestID}) p.emitReceipt(receipt.EmitOpts{ ActionID: receipt.NewActionID(), Verdict: config.ActionBlock, @@ -3411,8 +3409,7 @@ func (p *Proxy) handleFetch(w http.ResponseWriter, r *http.Request) { if fetchRec != nil && cfg.AdaptiveEnforcement.Enabled && decide.UpgradeAction("", fetchRec.EscalationLevel(), &cfg.AdaptiveEnforcement) == config.ActionBlock { headerSessionKey := CeeSessionKey(agent, clientIP) - log.LogAdaptiveUpgrade(headerSessionKey, session.EscalationLabel(fetchRec.EscalationLevel()), "", config.ActionBlock, "session_deny", clientIP, requestID) - p.metrics.RecordAdaptiveUpgrade("", config.ActionBlock, session.EscalationLabel(fetchRec.EscalationLevel())) + recordAdaptiveUpgrade(log, p.metrics, adaptiveUpgrade{SessionKey: headerSessionKey, Level: session.EscalationLabel(fetchRec.EscalationLevel()), FromAction: "", ToAction: config.ActionBlock, Scanner: "session_deny", ClientIP: clientIP, RequestID: requestID}) p.emitReceipt(receipt.EmitOpts{ ActionID: receipt.NewActionID(), Verdict: config.ActionBlock, @@ -3563,8 +3560,7 @@ func (p *Proxy) handleFetch(w http.ResponseWriter, r *http.Request) { // Re-check block_all after CEE may have escalated the session. Use the // live recorder so mid-request escalations are reflected immediately. if fetchRec != nil && decide.UpgradeAction("", fetchRec.EscalationLevel(), &cfg.AdaptiveEnforcement) == config.ActionBlock { - log.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(fetchRec.EscalationLevel()), "", config.ActionBlock, "session_deny", clientIP, requestID) - p.metrics.RecordAdaptiveUpgrade("", config.ActionBlock, session.EscalationLabel(fetchRec.EscalationLevel())) + recordAdaptiveUpgrade(log, p.metrics, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(fetchRec.EscalationLevel()), FromAction: "", ToAction: config.ActionBlock, Scanner: "session_deny", ClientIP: clientIP, RequestID: requestID}) p.emitReceipt(receipt.EmitOpts{ ActionID: receipt.NewActionID(), Verdict: config.ActionBlock, @@ -4235,8 +4231,7 @@ func (p *Proxy) filterAndActOnResponseScan( } if action != originalAction { sessionKey := sessionKeyFor(agent, clientIP) - log.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(sessionLevel), originalAction, action, "response_scan", clientIP, requestID) - p.metrics.RecordAdaptiveUpgrade(originalAction, action, session.EscalationLabel(sessionLevel)) + recordAdaptiveUpgrade(log, p.metrics, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(sessionLevel), FromAction: originalAction, ToAction: action, Scanner: "response_scan", ClientIP: clientIP, RequestID: requestID}) } // recordResponseSignal records an adaptive enforcement signal for the diff --git a/internal/proxy/websocket.go b/internal/proxy/websocket.go index ac5f4340..25fc145a 100644 --- a/internal/proxy/websocket.go +++ b/internal/proxy/websocket.go @@ -297,8 +297,7 @@ func (p *Proxy) handleWebSocket(w http.ResponseWriter, r *http.Request) { effectiveAction := decide.UpgradeAction(baseAction, sr.Level, &cfg.AdaptiveEnforcement) if effectiveAction == config.ActionBlock { sessionKey := sessionKeyFor(agent, clientIP) - log.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(sr.Level), baseAction, effectiveAction, result.Scanner, clientIP, requestID) - p.metrics.RecordAdaptiveUpgrade(baseAction, effectiveAction, session.EscalationLabel(sr.Level)) + recordAdaptiveUpgrade(log, p.metrics, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(sr.Level), FromAction: baseAction, ToAction: effectiveAction, Scanner: result.Scanner, ClientIP: clientIP, RequestID: requestID}) log.LogBlockedDetail(actx, result.Scanner, result.Reason+" (escalated)", auditDetailFromResult(result)) p.metrics.RecordWSBlocked() p.emitReceipt(receipt.EmitOpts{ @@ -342,8 +341,7 @@ func (p *Proxy) handleWebSocket(w http.ResponseWriter, r *http.Request) { // session is at an escalation level with block_all=true. if sr.Level > 0 && decide.UpgradeAction("", sr.Level, &cfg.AdaptiveEnforcement) == config.ActionBlock { sessionKey := sessionKeyFor(agent, clientIP) - log.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(sr.Level), "", config.ActionBlock, "session_deny", clientIP, requestID) - p.metrics.RecordAdaptiveUpgrade("", config.ActionBlock, session.EscalationLabel(sr.Level)) + recordAdaptiveUpgrade(log, p.metrics, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(sr.Level), FromAction: "", ToAction: config.ActionBlock, Scanner: "session_deny", ClientIP: clientIP, RequestID: requestID}) p.metrics.RecordWSBlocked() p.emitReceipt(receipt.EmitOpts{ ActionID: receipt.NewActionID(), @@ -457,8 +455,7 @@ func (p *Proxy) handleWebSocket(w http.ResponseWriter, r *http.Request) { if cfg.AdaptiveEnforcement.Enabled && headerSR.Level > 0 && decide.UpgradeAction("", headerSR.Level, &cfg.AdaptiveEnforcement) == config.ActionBlock { sessionKey := sessionKeyFor(agent, clientIP) - log.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(headerSR.Level), "", config.ActionBlock, "session_deny", clientIP, requestID) - p.metrics.RecordAdaptiveUpgrade("", config.ActionBlock, session.EscalationLabel(headerSR.Level)) + recordAdaptiveUpgrade(log, p.metrics, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(headerSR.Level), FromAction: "", ToAction: config.ActionBlock, Scanner: "session_deny", ClientIP: clientIP, RequestID: requestID}) p.metrics.RecordWSBlocked() p.emitReceipt(receipt.EmitOpts{ ActionID: receipt.NewActionID(), @@ -1171,8 +1168,7 @@ func (r *wsRelay) handleClientTextFindings(log *audit.Logger, dlpMatches []scann if effectiveAction == config.ActionBlock { r.recordSignal(session.SignalBlock, log) sessionKey := sessionKeyFor(r.agent, r.clientIP) - log.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(r.escalationLevel()), baseAction, effectiveAction, audit.ScannerDLP, r.clientIP, r.requestID) - r.proxy.metrics.RecordAdaptiveUpgrade(baseAction, effectiveAction, session.EscalationLabel(r.escalationLevel())) + recordAdaptiveUpgrade(log, r.proxy.metrics, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(r.escalationLevel()), FromAction: baseAction, ToAction: effectiveAction, Scanner: audit.ScannerDLP, ClientIP: r.clientIP, RequestID: r.requestID}) reason := fmt.Sprintf("DLP match: %s (escalated)", strings.Join(names, ", ")) log.LogWSBlocked(r.targetURL, audit.DirectionClientToServer, audit.ScannerDLP, reason, r.clientIP, r.requestID) r.proxy.metrics.RecordWSScanHit(audit.ScannerDLP) @@ -1224,8 +1220,7 @@ func (r *wsRelay) handleClientTextFindings(log *audit.Logger, dlpMatches []scann addrAction = decide.UpgradeAction(addrAction, r.escalationLevel(), &r.cfg.AdaptiveEnforcement) if addrAction != originalAddrAction { sessionKey := sessionKeyFor(r.agent, r.clientIP) - log.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(r.escalationLevel()), originalAddrAction, addrAction, scannerLabelAddressProtection, r.clientIP, r.requestID) - r.proxy.metrics.RecordAdaptiveUpgrade(originalAddrAction, addrAction, session.EscalationLabel(r.escalationLevel())) + recordAdaptiveUpgrade(log, r.proxy.metrics, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(r.escalationLevel()), FromAction: originalAddrAction, ToAction: addrAction, Scanner: scannerLabelAddressProtection, ClientIP: r.clientIP, RequestID: r.requestID}) } if r.cfg.EnforceEnabled() && addrAction == config.ActionBlock { r.recordSignal(session.SignalBlock, log) @@ -1489,8 +1484,7 @@ func (r *wsRelay) handleClientMessageBodyResult(log *audit.Logger, bodyBytes []b action = decide.UpgradeAction(action, r.escalationLevel(), &r.cfg.AdaptiveEnforcement) if action != originalAction { sessionKey := sessionKeyFor(r.agent, r.clientIP) - log.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(r.escalationLevel()), originalAction, action, scannerLabel, r.clientIP, r.requestID) - r.proxy.metrics.RecordAdaptiveUpgrade(originalAction, action, session.EscalationLabel(r.escalationLevel())) + recordAdaptiveUpgrade(log, r.proxy.metrics, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(r.escalationLevel()), FromAction: originalAction, ToAction: action, Scanner: scannerLabel, ClientIP: r.clientIP, RequestID: r.requestID}) } switch action { @@ -1628,8 +1622,7 @@ func (r *wsRelay) clientToUpstream(ctx context.Context, cancel context.CancelFun // clean frames from flowing after escalation during long-lived connections. if decide.UpgradeAction("", r.escalationLevel(), &r.cfg.AdaptiveEnforcement) == config.ActionBlock { sessionKey := sessionKeyFor(r.agent, r.clientIP) - log.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(r.escalationLevel()), "", config.ActionBlock, "session_deny", r.clientIP, r.requestID) - r.proxy.metrics.RecordAdaptiveUpgrade("", config.ActionBlock, session.EscalationLabel(r.escalationLevel())) + recordAdaptiveUpgrade(log, r.proxy.metrics, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(r.escalationLevel()), FromAction: "", ToAction: config.ActionBlock, Scanner: "session_deny", ClientIP: r.clientIP, RequestID: r.requestID}) r.terminalOnce.Do(func() { r.proxy.emitReceipt(receipt.EmitOpts{ ActionID: receipt.NewActionID(), @@ -1956,8 +1949,7 @@ func (r *wsRelay) upstreamToClient(ctx context.Context, cancel context.CancelFun // block_all=true, close the WebSocket immediately. if decide.UpgradeAction("", r.escalationLevel(), &r.cfg.AdaptiveEnforcement) == config.ActionBlock { sessionKey := sessionKeyFor(r.agent, r.clientIP) - log.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(r.escalationLevel()), "", config.ActionBlock, "session_deny", r.clientIP, r.requestID) - r.proxy.metrics.RecordAdaptiveUpgrade("", config.ActionBlock, session.EscalationLabel(r.escalationLevel())) + recordAdaptiveUpgrade(log, r.proxy.metrics, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(r.escalationLevel()), FromAction: "", ToAction: config.ActionBlock, Scanner: "session_deny", ClientIP: r.clientIP, RequestID: r.requestID}) r.terminalOnce.Do(func() { r.proxy.emitReceipt(receipt.EmitOpts{ ActionID: receipt.NewActionID(), @@ -2167,8 +2159,7 @@ func (r *wsRelay) upstreamToClient(ctx context.Context, cancel context.CancelFun } if wsAction != originalWSAction { sessionKey := sessionKeyFor(r.agent, r.clientIP) - log.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(r.escalationLevel()), originalWSAction, wsAction, "response_scan", r.clientIP, r.requestID) - r.proxy.metrics.RecordAdaptiveUpgrade(originalWSAction, wsAction, session.EscalationLabel(r.escalationLevel())) + recordAdaptiveUpgrade(log, r.proxy.metrics, adaptiveUpgrade{SessionKey: sessionKey, Level: session.EscalationLabel(r.escalationLevel()), FromAction: originalWSAction, ToAction: wsAction, Scanner: "response_scan", ClientIP: r.clientIP, RequestID: r.requestID}) } switch wsAction { diff --git a/internal/sentry/scrub_test.go b/internal/sentry/scrub_test.go index 40967ec9..61bdb57a 100644 --- a/internal/sentry/scrub_test.go +++ b/internal/sentry/scrub_test.go @@ -9,6 +9,11 @@ import ( "github.com/luckyPipewrench/pipelock/internal/config" ) +const ( + testEnvSecret = "my-super-secret-value-12345" + testAWSKeyID = "AKIA" + "IOSFODNN7EXAMPLE" // split to dodge gosec G101 +) + func testDLPPatterns() []config.DLPPattern { return []config.DLPPattern{ {Name: "AWS Access Key", Regex: `AKIA[0-9A-Z]{16}`, Severity: "critical"}, @@ -24,7 +29,7 @@ func TestScrubString_DLPPatterns(t *testing.T) { name string input string }{ - {"AWS key", "error at url with " + "AKIA" + "IOSFODNN7EXAMPLE"}, + {"AWS key", "error at url with " + testAWSKeyID}, {"GitHub token", "failed for " + "ghp_" + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghij"}, {"Anthropic key", "request to " + "sk-ant-" + "api03-abcdef1234"}, {"Bearer token", "Authorization header Bearer " + "eyJhbGciOiJIUzI1NiJ9.test"}, @@ -103,7 +108,7 @@ func TestScrubString_EmptyString(t *testing.T) { } func TestScrubString_EnvSecrets(t *testing.T) { - secret := "my-super-secret-value-12345" //nolint:goconst // test value + secret := testEnvSecret s := NewScrubber(nil, []string{secret}) input := "error: env value was " + secret + " in context" result := s.ScrubString(input) @@ -125,7 +130,7 @@ func TestScrubString_URLQueryParams(t *testing.T) { } func TestScrubEvent_Message(t *testing.T) { - awsKey := "AKIA" + "IOSFODNN7EXAMPLE" //nolint:goconst // test value + awsKey := testAWSKeyID s := NewScrubber(testDLPPatterns(), nil) event := &sentry.Event{ Message: "error with key " + awsKey, @@ -141,7 +146,7 @@ func TestScrubEvent_Message(t *testing.T) { } func TestScrubEvent_Exception(t *testing.T) { - awsKey := "AKIA" + "IOSFODNN7EXAMPLE" + awsKey := testAWSKeyID s := NewScrubber(testDLPPatterns(), nil) event := &sentry.Event{ Exception: []sentry.Exception{ @@ -165,7 +170,7 @@ func TestScrubEvent_Exception(t *testing.T) { } func TestScrubEvent_Breadcrumbs(t *testing.T) { - awsKey := "AKIA" + "IOSFODNN7EXAMPLE" + awsKey := testAWSKeyID s := NewScrubber(testDLPPatterns(), nil) event := &sentry.Event{ Breadcrumbs: []*sentry.Breadcrumb{ @@ -182,7 +187,7 @@ func TestScrubEvent_Breadcrumbs(t *testing.T) { } func TestScrubEvent_Tags(t *testing.T) { - awsKey := "AKIA" + "IOSFODNN7EXAMPLE" + awsKey := testAWSKeyID s := NewScrubber(testDLPPatterns(), nil) event := &sentry.Event{ Tags: map[string]string{"url": "https://api.example.com/" + awsKey}, @@ -263,7 +268,7 @@ func TestScrubEvent_ServerNameWiped(t *testing.T) { } func TestScrubEvent_ExceptionType(t *testing.T) { - awsKey := "AKIA" + "IOSFODNN7EXAMPLE" + awsKey := testAWSKeyID s := NewScrubber(testDLPPatterns(), nil) event := &sentry.Event{ Exception: []sentry.Exception{ @@ -280,7 +285,7 @@ func TestScrubEvent_ExceptionType(t *testing.T) { } func TestScrubEvent_Transaction(t *testing.T) { - awsKey := "AKIA" + "IOSFODNN7EXAMPLE" + awsKey := testAWSKeyID s := NewScrubber(testDLPPatterns(), nil) event := &sentry.Event{ Transaction: "/api/fetch?key=" + awsKey, @@ -292,7 +297,7 @@ func TestScrubEvent_Transaction(t *testing.T) { } func TestScrubEvent_Fingerprint(t *testing.T) { - awsKey := "AKIA" + "IOSFODNN7EXAMPLE" + awsKey := testAWSKeyID s := NewScrubber(testDLPPatterns(), nil) event := &sentry.Event{ Fingerprint: []string{"error-group", "key=" + awsKey}, @@ -326,7 +331,7 @@ func TestScrubEvent_BreadcrumbDataNonStringDeleted(t *testing.T) { } func TestScrubEvent_ContextsStringScrubbed(t *testing.T) { - awsKey := "AKIA" + "IOSFODNN7EXAMPLE" + awsKey := testAWSKeyID s := NewScrubber(testDLPPatterns(), nil) event := &sentry.Event{ Contexts: map[string]sentry.Context{ @@ -394,7 +399,7 @@ func TestScrubEvent_VarsNonStringDeleted(t *testing.T) { } func TestScrubEvent_ThreadsVarsScrubbed(t *testing.T) { - awsKey := "AKIA" + "IOSFODNN7EXAMPLE" + awsKey := testAWSKeyID s := NewScrubber(testDLPPatterns(), nil) event := &sentry.Event{ Threads: []sentry.Thread{ From 02c7228437f992594f098bc1a9df7144a2dcc01a Mon Sep 17 00:00:00 2001 From: luckyPipewrench Date: Sun, 31 May 2026 11:48:01 -0400 Subject: [PATCH 2/2] test(proxy): cover WS relay adaptive-upgrade branches Add tests driving the three WebSocket relay warn->block adaptive-upgrade branches (client-text DLP, address-protection, and request-body findings) with an escalated session in audit mode, where escalation flips the warn action to block. Raises codecov/patch on the adaptive-upgrade helper consolidation from 70% to ~94%, over the 75% target. The remaining uncovered recordAdaptiveUpgrade sites are secondary session_deny recheck branches reached only after an earlier identical block_all check returns first; covering them in isolation is not feasible without disabling the prior check. --- .../proxy/websocket_adaptive_upgrade_test.go | 139 ++++++++++++++++++ 1 file changed, 139 insertions(+) create mode 100644 internal/proxy/websocket_adaptive_upgrade_test.go diff --git a/internal/proxy/websocket_adaptive_upgrade_test.go b/internal/proxy/websocket_adaptive_upgrade_test.go new file mode 100644 index 00000000..f57a3559 --- /dev/null +++ b/internal/proxy/websocket_adaptive_upgrade_test.go @@ -0,0 +1,139 @@ +// Copyright 2026 Josh Waldrep +// SPDX-License-Identifier: Apache-2.0 + +package proxy + +import ( + "testing" + + "github.com/luckyPipewrench/pipelock/internal/addressprotect" + "github.com/luckyPipewrench/pipelock/internal/audit" + "github.com/luckyPipewrench/pipelock/internal/config" + "github.com/luckyPipewrench/pipelock/internal/metrics" + "github.com/luckyPipewrench/pipelock/internal/scanner" +) + +// newAdaptiveWSRelay builds a wsRelay wired to a session escalated to the given +// level, with adaptive enforcement on and enforce=false (audit mode). In audit +// mode a warn-action finding is not hard-blocked, so the relay reaches the +// adaptive-upgrade branch where escalation flips warn -> block. Returns the +// relay plus its session manager and metrics for assertions. +func newAdaptiveWSRelay(t *testing.T, level int) (*wsRelay, *metrics.Metrics) { + t.Helper() + cfg := adaptiveConfig() // enforce=false, adaptive on, profiling on + m := metrics.New() + p := &Proxy{logger: audit.NewNop(), metrics: m} + + smCfg := &config.SessionProfiling{ + Enabled: true, + MaxSessions: 100, + DomainBurst: 100, + WindowMinutes: 5, + SessionTTLMinutes: 30, + CleanupIntervalSeconds: 600, + } + sm := NewSessionManager(smCfg, nil, m) + t.Cleanup(sm.Close) + rec := sm.GetOrCreate(adaptiveSessionKeyLoopback) + escalateRec(rec, level) + + relay := &wsRelay{ + clientConn: discardConn{}, + upstreamConn: discardConn{}, + proxy: p, + cfg: cfg, + rec: rec, + agent: agentAnonymous, + clientIP: adaptiveSessionKeyLoopback, + requestID: "req-adaptive", + targetURL: "ws://example.com/socket", + } + return relay, m +} + +// adaptiveUpgradeTotal sums the pipelock_adaptive_upgrades_total counter across +// all label sets in the registry. Used to confirm an adaptive upgrade fired +// without depending on the exact from/to/level labels. +func adaptiveUpgradeTotal(t *testing.T, m *metrics.Metrics) float64 { + t.Helper() + families, err := m.Registry().Gather() + if err != nil { + t.Fatalf("gather metrics: %v", err) + } + var total float64 + for _, fam := range families { + if fam.GetName() != "pipelock_adaptive_upgrades_total" { + continue + } + for _, mm := range fam.GetMetric() { + total += mm.GetCounter().GetValue() + } + } + return total +} + +// TestWSRelay_HandleClientTextFindings_DLPAdaptiveUpgrade drives the relay DLP +// warn->block adaptive-upgrade branch: a non-critical DLP match in audit mode +// is upgraded to block because the session is escalated. +func TestWSRelay_HandleClientTextFindings_DLPAdaptiveUpgrade(t *testing.T) { + relay, m := newAdaptiveWSRelay(t, 2) + + blocked := relay.handleClientTextFindings(audit.NewNop(), []scanner.TextDLPMatch{{ + PatternName: "Generic Secret", + Severity: config.SeverityMedium, + }}, nil) + + if !blocked { + t.Fatal("expected escalated DLP finding to block in audit mode") + } + if got := adaptiveUpgradeTotal(t, m); got != 1 { + t.Errorf("adaptive upgrade count = %v, want 1", got) + } +} + +// TestWSRelay_HandleClientTextFindings_AddressAdaptiveUpgrade drives the relay +// address-protection warn->block adaptive-upgrade branch. +func TestWSRelay_HandleClientTextFindings_AddressAdaptiveUpgrade(t *testing.T) { + relay, m := newAdaptiveWSRelay(t, 2) + + blocked := relay.handleClientTextFindings(audit.NewNop(), nil, []addressprotect.Finding{{ + Hit: addressprotect.Hit{ + Chain: "eth", + Normalized: "0x742d35cc6634c0532925a3b844bc454e4438f44e", + }, + Verdict: addressprotect.VerdictLookalike, + Action: config.ActionWarn, + MatchedAddr: "0x742d35...38f44e", + Explanation: "lookalike payout address", + }}) + + if !blocked { + t.Fatal("expected escalated address finding to block in audit mode") + } + if got := adaptiveUpgradeTotal(t, m); got != 1 { + t.Errorf("adaptive upgrade count = %v, want 1", got) + } +} + +// TestWSRelay_HandleClientMessageBodyResult_AdaptiveUpgrade drives the relay +// request-body warn->block adaptive-upgrade branch. +func TestWSRelay_HandleClientMessageBodyResult_AdaptiveUpgrade(t *testing.T) { + relay, m := newAdaptiveWSRelay(t, 2) + + blocked := relay.handleClientMessageBodyResult(audit.NewNop(), []byte(`{"prompt":"flagged"}`), BodyScanResult{ + Clean: false, + Action: config.ActionWarn, + Reason: "body finding", + DLPMatches: []scanner.TextDLPMatch{{ + PatternName: "Generic Secret", + Severity: config.SeverityMedium, + }}, + }) + + if !blocked { + t.Fatal("expected escalated body finding to block in audit mode") + } + if got := adaptiveUpgradeTotal(t, m); got != 1 { + t.Errorf("adaptive upgrade count = %v, want 1", got) + } +}