diff --git a/internal/proxy/cee.go b/internal/proxy/cee.go index 89dea378..53cc4a47 100644 --- a/internal/proxy/cee.go +++ b/internal/proxy/cee.go @@ -26,10 +26,7 @@ import ( // CeeSessionKey builds a consistent session identity for cross-request // exfiltration detection. Exported for use by the session reset admin API. func CeeSessionKey(agent, clientIP string) string { - if agent != "" && agent != agentAnonymous { - return agent + "|" + clientIP - } - return clientIP + return sessionKeyFor(agent, clientIP) } // maxCaptureSessionKeyLen aliases the writer-side ceiling so the diff --git a/internal/proxy/forward.go b/internal/proxy/forward.go index a7fa5183..ab2232fb 100644 --- a/internal/proxy/forward.go +++ b/internal/proxy/forward.go @@ -317,10 +317,7 @@ func (p *Proxy) handleConnect(w http.ResponseWriter, r *http.Request) { baseAction := config.ActionWarn effectiveAction := decide.UpgradeAction(baseAction, sr.Level, &cfg.AdaptiveEnforcement) if effectiveAction == config.ActionBlock { - sessionKey := clientIP - if agent != "" && agent != agentAnonymous { - sessionKey = agent + "|" + clientIP - } + sessionKey := sessionKeyFor(agent, clientIP) p.logger.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(sr.Level), baseAction, effectiveAction, result.Scanner, clientIP, requestID) p.metrics.RecordAdaptiveUpgrade(baseAction, effectiveAction, session.EscalationLabel(sr.Level)) p.logger.LogBlockedDetail(targetCtx, result.Scanner, result.Reason+" (escalated)", auditDetailFromResult(result)) @@ -343,10 +340,7 @@ func (p *Proxy) handleConnect(w http.ResponseWriter, r *http.Request) { // block_all enforcement: deny ALL traffic (including clean) when the // session is at an escalation level with block_all=true. if sr.Level > 0 && decide.UpgradeAction("", sr.Level, &cfg.AdaptiveEnforcement) == config.ActionBlock { - sessionKey := clientIP - if agent != "" && agent != agentAnonymous { - sessionKey = agent + "|" + clientIP - } + sessionKey := sessionKeyFor(agent, clientIP) p.logger.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(sr.Level), "", config.ActionBlock, "session_deny", clientIP, requestID) p.metrics.RecordAdaptiveUpgrade("", config.ActionBlock, session.EscalationLabel(sr.Level)) p.metrics.RecordTunnelBlocked(agentLabel) @@ -619,11 +613,7 @@ func (p *Proxy) handleConnect(w http.ResponseWriter, r *http.Request) { // escalation level lookups instead of a stale snapshot from sr.Level. var interceptRec session.Recorder if sm := p.sessionMgrPtr.Load(); sm != nil { - interceptSessionKey := clientIP - if agent != "" && agent != agentAnonymous { - interceptSessionKey = agent + "|" + clientIP - } - interceptRec = sm.GetOrCreate(interceptSessionKey) + interceptRec = sm.GetOrCreate(sessionKeyFor(agent, clientIP)) } if err := interceptTunnel(interceptCtx, interceptConn, &InterceptContext{ TargetHost: host, @@ -931,10 +921,7 @@ func (p *Proxy) handleForwardHTTP(w http.ResponseWriter, r *http.Request) { baseAction := config.ActionWarn effectiveAction := decide.UpgradeAction(baseAction, sr.Level, &cfg.AdaptiveEnforcement) if effectiveAction == config.ActionBlock { - sessionKey := clientIP - if agent != "" && agent != agentAnonymous { - sessionKey = agent + "|" + clientIP - } + sessionKey := sessionKeyFor(agent, clientIP) p.logger.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(sr.Level), baseAction, effectiveAction, result.Scanner, clientIP, requestID) p.metrics.RecordAdaptiveUpgrade(baseAction, effectiveAction, session.EscalationLabel(sr.Level)) p.logger.LogBlockedDetail(actx, result.Scanner, result.Reason+" (escalated)", auditDetailFromResult(result)) @@ -958,10 +945,7 @@ func (p *Proxy) handleForwardHTTP(w http.ResponseWriter, r *http.Request) { // block_all enforcement: deny ALL traffic (including clean) when the // session is at an escalation level with block_all=true. if sr.Level > 0 && decide.UpgradeAction("", sr.Level, &cfg.AdaptiveEnforcement) == config.ActionBlock { - sessionKey := clientIP - if agent != "" && agent != agentAnonymous { - sessionKey = agent + "|" + clientIP - } + sessionKey := sessionKeyFor(agent, clientIP) p.logger.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(sr.Level), "", config.ActionBlock, "session_deny", clientIP, requestID) p.metrics.RecordAdaptiveUpgrade("", config.ActionBlock, session.EscalationLabel(sr.Level)) p.metrics.RecordBlocked(r.URL.Hostname(), "session_deny", time.Since(start), agentLabel) @@ -1228,10 +1212,7 @@ func (p *Proxy) handleForwardHTTP(w http.ResponseWriter, r *http.Request) { action = decide.UpgradeAction(action, sr.Level, &cfg.AdaptiveEnforcement) } if action != originalBodyAction { - sessionKey := clientIP - if agent != "" && agent != agentAnonymous { - sessionKey = agent + "|" + clientIP - } + sessionKey := sessionKeyFor(agent, clientIP) p.logger.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(sr.Level), originalBodyAction, action, scannerLabel, clientIP, requestID) p.metrics.RecordAdaptiveUpgrade(originalBodyAction, action, session.EscalationLabel(sr.Level)) } @@ -2105,10 +2086,7 @@ func (p *Proxy) handleForwardHTTP(w http.ResponseWriter, r *http.Request) { if forwardRec != nil && !fwdRespExempt { action = decide.UpgradeAction(action, forwardRec.EscalationLevel(), &cfg.AdaptiveEnforcement) if action != originalAction { - sessionKey := clientIP - if agent != "" && agent != agentAnonymous { - sessionKey = agent + "|" + clientIP - } + sessionKey := sessionKeyFor(agent, clientIP) p.logger.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(forwardRec.EscalationLevel()), originalAction, action, "response_scan", clientIP, requestID) p.metrics.RecordAdaptiveUpgrade(originalAction, action, session.EscalationLabel(forwardRec.EscalationLevel())) } @@ -2127,10 +2105,7 @@ func (p *Proxy) handleForwardHTTP(w http.ResponseWriter, r *http.Request) { // Exempt domains skip scoring — findings are logged but don't escalate. if !fwdRespExempt { if sm := p.sessionMgrPtr.Load(); sm != nil && cfg.AdaptiveEnforcement.Enabled { - sessionKey := clientIP - if agent != "" && agent != agentAnonymous { - sessionKey = agent + "|" + clientIP - } + sessionKey := sessionKeyFor(agent, clientIP) sess := sm.GetOrCreate(sessionKey) decide.RecordSignal(sess, session.SignalStrip, decide.EscalationParams{ Threshold: cfg.AdaptiveEnforcement.EscalationThreshold, diff --git a/internal/proxy/intercept.go b/internal/proxy/intercept.go index a55bd96b..735e97e6 100644 --- a/internal/proxy/intercept.go +++ b/internal/proxy/intercept.go @@ -120,10 +120,7 @@ func interceptRecordSignal(ic *InterceptContext, sig session.SignalType) { if !ic.Config.AdaptiveEnforcement.Enabled { return } - sessionKey := ic.ClientIP - if ic.Agent != "" && ic.Agent != agentAnonymous { - sessionKey = ic.Agent + "|" + ic.ClientIP - } + sessionKey := sessionKeyFor(ic.Agent, ic.ClientIP) var m *metrics.Metrics if ic.Proxy != nil { m = ic.Proxy.metrics @@ -606,10 +603,7 @@ func newInterceptHandler( baseAction := config.ActionWarn effectiveAction := decide.UpgradeAction(baseAction, recEscalationLevel(ic.Recorder), &ic.Config.AdaptiveEnforcement) if effectiveAction == config.ActionBlock { - sessionKey := ic.ClientIP - if ic.Agent != "" && ic.Agent != agentAnonymous { - sessionKey = ic.Agent + "|" + ic.ClientIP - } + sessionKey := sessionKeyFor(ic.Agent, ic.ClientIP) ic.Logger.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(recEscalationLevel(ic.Recorder)), baseAction, effectiveAction, urlResult.Scanner, ic.ClientIP, ic.RequestID) if ic.Proxy != nil { ic.Proxy.metrics.RecordAdaptiveUpgrade(baseAction, effectiveAction, session.EscalationLabel(recEscalationLevel(ic.Recorder))) @@ -826,10 +820,7 @@ func newInterceptHandler( action = decide.UpgradeAction(action, recEscalationLevel(ic.Recorder), &ic.Config.AdaptiveEnforcement) } if action != originalBodyAction { - sessionKey := ic.ClientIP - if ic.Agent != "" && ic.Agent != agentAnonymous { - sessionKey = ic.Agent + "|" + ic.ClientIP - } + sessionKey := sessionKeyFor(ic.Agent, ic.ClientIP) ic.Logger.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(recEscalationLevel(ic.Recorder)), originalBodyAction, action, scannerLabel, ic.ClientIP, ic.RequestID) if ic.Proxy != nil { ic.Proxy.metrics.RecordAdaptiveUpgrade(originalBodyAction, action, session.EscalationLabel(recEscalationLevel(ic.Recorder))) @@ -1102,10 +1093,7 @@ func newInterceptHandler( interceptMetrics = ic.Proxy.metrics } if changed, fromLabel, toLabel := trySessionRecovery(ic.Recorder, &ic.Config.AdaptiveEnforcement, interceptMetrics); changed { - sessionKey := ic.ClientIP - if ic.Agent != "" && ic.Agent != agentAnonymous { - sessionKey = ic.Agent + "|" + ic.ClientIP - } + sessionKey := sessionKeyFor(ic.Agent, ic.ClientIP) if ic.Logger != nil { ic.Logger.LogAdaptiveEscalation(sessionKey, fromLabel, toLabel, ic.ClientIP, ic.RequestID, ic.Recorder.ThreatScore()) } @@ -1114,10 +1102,7 @@ func newInterceptHandler( // block_all enforcement: deny ALL traffic (including clean) when the // session is at an escalation level with block_all=true. if ic.Recorder != nil && decide.UpgradeAction("", recEscalationLevel(ic.Recorder), &ic.Config.AdaptiveEnforcement) == config.ActionBlock { - sessionKey := ic.ClientIP - if ic.Agent != "" && ic.Agent != agentAnonymous { - sessionKey = ic.Agent + "|" + ic.ClientIP - } + sessionKey := sessionKeyFor(ic.Agent, ic.ClientIP) ic.Logger.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(recEscalationLevel(ic.Recorder)), "", config.ActionBlock, "session_deny", ic.ClientIP, ic.RequestID) if ic.Proxy != nil { ic.Proxy.metrics.RecordAdaptiveUpgrade("", config.ActionBlock, session.EscalationLabel(recEscalationLevel(ic.Recorder))) @@ -1723,10 +1708,7 @@ func newInterceptHandler( action = decide.UpgradeAction(action, recEscalationLevel(ic.Recorder), &ic.Config.AdaptiveEnforcement) } if action != originalAction { - sessionKey := ic.ClientIP - if ic.Agent != "" && ic.Agent != agentAnonymous { - sessionKey = ic.Agent + "|" + ic.ClientIP - } + sessionKey := sessionKeyFor(ic.Agent, ic.ClientIP) ic.Logger.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(recEscalationLevel(ic.Recorder)), originalAction, action, "response_scan", ic.ClientIP, ic.RequestID) if ic.Proxy != nil { ic.Proxy.metrics.RecordAdaptiveUpgrade(originalAction, action, session.EscalationLabel(recEscalationLevel(ic.Recorder))) @@ -1772,10 +1754,7 @@ func newInterceptHandler( ceeSM = ic.Proxy.sessionMgrPtr.Load() } if ceeSM != nil { - sessionKey := ic.ClientIP - if ic.Agent != "" && ic.Agent != agentAnonymous { - sessionKey = ic.Agent + "|" + ic.ClientIP - } + sessionKey := sessionKeyFor(ic.Agent, ic.ClientIP) sess := ceeSM.GetOrCreate(sessionKey) var stripMetrics *metrics.Metrics if ic.Proxy != nil { diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 2ed84fd2..bd546749 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -2096,12 +2096,7 @@ func (p *Proxy) recordSessionActivityWithUserAgent(opts sessionActivityOptions) return SessionResult{} } - // Build session key: agent|clientIP when agent is known, else just clientIP. - key := clientIP - if agent != "" && agent != agentAnonymous { - key = agent + "|" + clientIP - } - + key := sessionKeyFor(agent, clientIP) sess := sm.GetOrCreate(key) // On-entry de-escalation: recover sessions stuck at block_all. @@ -2487,10 +2482,7 @@ func (p *Proxy) recordShieldIntervention(summary *receipt.ShieldSummary, cfg *co if sm == nil { return } - sessionKey := clientIP - if agent := actx.Agent(); agent != "" && agent != agentAnonymous { - sessionKey = agent + "|" + clientIP - } + sessionKey := sessionKeyFor(actx.Agent(), clientIP) sess := sm.GetOrCreate(sessionKey) for i := 0; i < signals; i++ { if decide.RecordSignal(sess, session.SignalShieldRewrite, decide.EscalationParams{ @@ -3134,11 +3126,7 @@ func (p *Proxy) handleFetch(w http.ResponseWriter, r *http.Request) { // RecordClean at the end when no finding was detected. var fetchRec session.Recorder if sm := p.sessionMgrPtr.Load(); sm != nil { - fetchSessionKey := clientIP - if agent != "" && agent != agentAnonymous { - fetchSessionKey = agent + "|" + clientIP - } - fetchRec = sm.GetOrCreate(fetchSessionKey) + fetchRec = sm.GetOrCreate(sessionKeyFor(agent, clientIP)) } fetchTaint := evaluateHTTPTaint(cfg, fetchRec, http.MethodGet, parsed) @@ -3224,10 +3212,7 @@ func (p *Proxy) handleFetch(w http.ResponseWriter, r *http.Request) { baseAction := config.ActionWarn effectiveAction := decide.UpgradeAction(baseAction, sr.Level, &cfg.AdaptiveEnforcement) if effectiveAction == config.ActionBlock { - sessionKey := clientIP - if agent != "" && agent != agentAnonymous { - sessionKey = agent + "|" + clientIP - } + sessionKey := sessionKeyFor(agent, clientIP) log.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(sr.Level), baseAction, effectiveAction, result.Scanner, clientIP, requestID) p.metrics.RecordAdaptiveUpgrade(baseAction, effectiveAction, session.EscalationLabel(sr.Level)) log.LogBlockedDetail(actx, result.Scanner, result.Reason+" (escalated)", auditDetailFromResult(result)) @@ -3309,10 +3294,7 @@ func (p *Proxy) handleFetch(w http.ResponseWriter, r *http.Request) { // session is at an escalation level with block_all=true. UpgradeAction // with an empty base action returns "block" only when block_all is set. if sr.Level > 0 && decide.UpgradeAction("", sr.Level, &cfg.AdaptiveEnforcement) == config.ActionBlock { - sessionKey := clientIP - if agent != "" && agent != agentAnonymous { - sessionKey = agent + "|" + clientIP - } + sessionKey := sessionKeyFor(agent, clientIP) log.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(sr.Level), "", config.ActionBlock, "session_deny", clientIP, requestID) p.metrics.RecordAdaptiveUpgrade("", config.ActionBlock, session.EscalationLabel(sr.Level)) p.emitReceipt(receipt.EmitOpts{ @@ -4252,10 +4234,7 @@ func (p *Proxy) filterAndActOnResponseScan( action = decide.UpgradeAction(action, sessionLevel, &cfg.AdaptiveEnforcement) } if action != originalAction { - sessionKey := clientIP - if agent != "" && agent != agentAnonymous { - sessionKey = agent + "|" + clientIP - } + sessionKey := sessionKeyFor(agent, clientIP) log.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(sessionLevel), originalAction, action, "response_scan", clientIP, requestID) p.metrics.RecordAdaptiveUpgrade(originalAction, action, session.EscalationLabel(sessionLevel)) } @@ -4268,10 +4247,7 @@ func (p *Proxy) filterAndActOnResponseScan( return } if sm := p.sessionMgrPtr.Load(); sm != nil && cfg.AdaptiveEnforcement.Enabled { - sessionKey := clientIP - if agent != "" && agent != agentAnonymous { - sessionKey = agent + "|" + clientIP - } + sessionKey := sessionKeyFor(agent, clientIP) sess := sm.GetOrCreate(sessionKey) decide.RecordSignal(sess, sig, decide.EscalationParams{ Threshold: cfg.AdaptiveEnforcement.EscalationThreshold, diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index 2b426bca..5e539b6a 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -5104,10 +5104,7 @@ func TestProxy_RegisterAndShutdownAgentServers(t *testing.T) { // recording enough block signals to cross the threshold repeatedly. // Returns the session key used. func escalateSession(sm *SessionManager, clientIP, agent string, threshold float64, targetLevel int) string { - key := clientIP - if agent != "" && agent != agentAnonymous { - key = agent + "|" + clientIP - } + key := sessionKeyFor(agent, clientIP) sess := sm.GetOrCreate(key) // Each escalation doubles the threshold. We need to accumulate enough // points to cross the threshold 'targetLevel' times. diff --git a/internal/proxy/session.go b/internal/proxy/session.go index 742d56a8..96398ffa 100644 --- a/internal/proxy/session.go +++ b/internal/proxy/session.go @@ -1303,10 +1303,7 @@ func (sm *SessionManager) AdaptiveStatus() AdaptiveStatus { } func (sm *SessionManager) AdaptiveWhoami(clientIP, agent string) AdaptiveWhoami { - key := clientIP - if agent != "" && agent != agentAnonymous { - key = agent + "|" + clientIP - } + key := sessionKeyFor(agent, clientIP) out := AdaptiveWhoami{ ClientIP: clientIP, Agent: agent, diff --git a/internal/proxy/sessionkey.go b/internal/proxy/sessionkey.go new file mode 100644 index 00000000..b108ea10 --- /dev/null +++ b/internal/proxy/sessionkey.go @@ -0,0 +1,17 @@ +package proxy + +// sessionKeyFor builds the per-session key used for adaptive-enforcement +// tracking and audit correlation. A named agent is namespaced ahead of its +// client IP so that two agents sharing one client IP are tracked as distinct +// sessions. An unnamed or anonymous agent keys on the client IP alone. +// +// This is the single source of truth for session-key construction. Every +// transport (fetch, forward, CONNECT, WebSocket, TLS intercept) must build +// the key the same way, otherwise adaptive escalation and de-escalation would +// track different keys for the same logical session. +func sessionKeyFor(agent, clientIP string) string { + if agent == "" || agent == agentAnonymous { + return clientIP + } + return agent + "|" + clientIP +} diff --git a/internal/proxy/sessionkey_test.go b/internal/proxy/sessionkey_test.go new file mode 100644 index 00000000..26412d53 --- /dev/null +++ b/internal/proxy/sessionkey_test.go @@ -0,0 +1,56 @@ +package proxy + +import "testing" + +func TestSessionKeyFor(t *testing.T) { + tests := []struct { + name string + agent string + clientIP string + want string + }{ + { + name: "named agent namespaces ahead of ip", + agent: "agent-a", + clientIP: "10.0.0.1", + want: "agent-a|10.0.0.1", + }, + { + name: "empty agent keys on ip alone", + agent: "", + clientIP: "10.0.0.1", + want: "10.0.0.1", + }, + { + name: "anonymous agent keys on ip alone", + agent: agentAnonymous, + clientIP: "10.0.0.1", + want: "10.0.0.1", + }, + { + name: "two named agents on same ip stay distinct", + agent: "agent-b", + clientIP: "10.0.0.1", + want: "agent-b|10.0.0.1", + }, + { + name: "named agent with empty ip", + agent: "agent-a", + clientIP: "", + want: "agent-a|", + }, + { + name: "empty agent and empty ip", + agent: "", + clientIP: "", + want: "", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := sessionKeyFor(tt.agent, tt.clientIP); got != tt.want { + t.Errorf("sessionKeyFor(%q, %q) = %q, want %q", tt.agent, tt.clientIP, got, tt.want) + } + }) + } +} diff --git a/internal/proxy/websocket.go b/internal/proxy/websocket.go index 509809cf..62b4a5cc 100644 --- a/internal/proxy/websocket.go +++ b/internal/proxy/websocket.go @@ -92,10 +92,7 @@ func (r *wsRelay) recordSignal(sig session.SignalType, log *audit.Logger) { if r.rec == nil || !r.cfg.AdaptiveEnforcement.Enabled { return } - sessionKey := r.clientIP - if r.agent != "" && r.agent != agentAnonymous { - sessionKey = r.agent + "|" + r.clientIP - } + sessionKey := sessionKeyFor(r.agent, r.clientIP) decide.RecordSignal(r.rec, sig, decide.EscalationParams{ Threshold: r.cfg.AdaptiveEnforcement.EscalationThreshold, Logger: log, @@ -299,10 +296,7 @@ func (p *Proxy) handleWebSocket(w http.ResponseWriter, r *http.Request) { baseAction := config.ActionWarn effectiveAction := decide.UpgradeAction(baseAction, sr.Level, &cfg.AdaptiveEnforcement) if effectiveAction == config.ActionBlock { - sessionKey := clientIP - if agent != "" && agent != agentAnonymous { - sessionKey = agent + "|" + clientIP - } + sessionKey := sessionKeyFor(agent, clientIP) log.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(sr.Level), baseAction, effectiveAction, result.Scanner, clientIP, requestID) p.metrics.RecordAdaptiveUpgrade(baseAction, effectiveAction, session.EscalationLabel(sr.Level)) log.LogBlockedDetail(actx, result.Scanner, result.Reason+" (escalated)", auditDetailFromResult(result)) @@ -347,10 +341,7 @@ func (p *Proxy) handleWebSocket(w http.ResponseWriter, r *http.Request) { // block_all enforcement: deny ALL traffic (including clean) when the // session is at an escalation level with block_all=true. if sr.Level > 0 && decide.UpgradeAction("", sr.Level, &cfg.AdaptiveEnforcement) == config.ActionBlock { - sessionKey := clientIP - if agent != "" && agent != agentAnonymous { - sessionKey = agent + "|" + clientIP - } + sessionKey := sessionKeyFor(agent, clientIP) log.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(sr.Level), "", config.ActionBlock, "session_deny", clientIP, requestID) p.metrics.RecordAdaptiveUpgrade("", config.ActionBlock, session.EscalationLabel(sr.Level)) p.metrics.RecordWSBlocked() @@ -465,10 +456,7 @@ func (p *Proxy) handleWebSocket(w http.ResponseWriter, r *http.Request) { // Re-check block_all after header DLP may have escalated the session. if cfg.AdaptiveEnforcement.Enabled && headerSR.Level > 0 && decide.UpgradeAction("", headerSR.Level, &cfg.AdaptiveEnforcement) == config.ActionBlock { - sessionKey := clientIP - if agent != "" && agent != agentAnonymous { - sessionKey = agent + "|" + clientIP - } + sessionKey := sessionKeyFor(agent, clientIP) log.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(headerSR.Level), "", config.ActionBlock, "session_deny", clientIP, requestID) p.metrics.RecordAdaptiveUpgrade("", config.ActionBlock, session.EscalationLabel(headerSR.Level)) p.metrics.RecordWSBlocked() @@ -571,10 +559,7 @@ func (p *Proxy) handleWebSocket(w http.ResponseWriter, r *http.Request) { // escalation changes during long-lived WS connections take effect. var wsRec session.Recorder if sm := p.sessionMgrPtr.Load(); sm != nil { - sessionKey := clientIP - if agent != "" && agent != agentAnonymous { - sessionKey = agent + "|" + clientIP - } + sessionKey := sessionKeyFor(agent, clientIP) wsRec = sm.GetOrCreate(sessionKey) } @@ -1185,10 +1170,7 @@ func (r *wsRelay) handleClientTextFindings(log *audit.Logger, dlpMatches []scann effectiveAction := decide.UpgradeAction(baseAction, r.escalationLevel(), &r.cfg.AdaptiveEnforcement) if effectiveAction == config.ActionBlock { r.recordSignal(session.SignalBlock, log) - sessionKey := r.clientIP - if r.agent != "" && r.agent != agentAnonymous { - sessionKey = r.agent + "|" + r.clientIP - } + sessionKey := sessionKeyFor(r.agent, r.clientIP) log.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(r.escalationLevel()), baseAction, effectiveAction, audit.ScannerDLP, r.clientIP, r.requestID) r.proxy.metrics.RecordAdaptiveUpgrade(baseAction, effectiveAction, session.EscalationLabel(r.escalationLevel())) reason := fmt.Sprintf("DLP match: %s (escalated)", strings.Join(names, ", ")) @@ -1241,10 +1223,7 @@ func (r *wsRelay) handleClientTextFindings(log *audit.Logger, dlpMatches []scann originalAddrAction := addrAction addrAction = decide.UpgradeAction(addrAction, r.escalationLevel(), &r.cfg.AdaptiveEnforcement) if addrAction != originalAddrAction { - sessionKey := r.clientIP - if r.agent != "" && r.agent != agentAnonymous { - sessionKey = r.agent + "|" + r.clientIP - } + sessionKey := sessionKeyFor(r.agent, r.clientIP) log.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(r.escalationLevel()), originalAddrAction, addrAction, scannerLabelAddressProtection, r.clientIP, r.requestID) r.proxy.metrics.RecordAdaptiveUpgrade(originalAddrAction, addrAction, session.EscalationLabel(r.escalationLevel())) } @@ -1509,10 +1488,7 @@ func (r *wsRelay) handleClientMessageBodyResult(log *audit.Logger, bodyBytes []b originalAction := action action = decide.UpgradeAction(action, r.escalationLevel(), &r.cfg.AdaptiveEnforcement) if action != originalAction { - sessionKey := r.clientIP - if r.agent != "" && r.agent != agentAnonymous { - sessionKey = r.agent + "|" + r.clientIP - } + sessionKey := sessionKeyFor(r.agent, r.clientIP) log.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(r.escalationLevel()), originalAction, action, scannerLabel, r.clientIP, r.requestID) r.proxy.metrics.RecordAdaptiveUpgrade(originalAction, action, session.EscalationLabel(r.escalationLevel())) } @@ -1643,10 +1619,7 @@ func (r *wsRelay) clientToUpstream(ctx context.Context, cancel context.CancelFun // On-entry de-escalation for long-lived WebSocket connections. if changed, fromLabel, toLabel := trySessionRecovery(r.rec, &r.cfg.AdaptiveEnforcement, r.proxy.metrics); changed { - sessionKey := r.clientIP - if r.agent != "" && r.agent != agentAnonymous { - sessionKey = r.agent + "|" + r.clientIP - } + sessionKey := sessionKeyFor(r.agent, r.clientIP) log.LogAdaptiveEscalation(sessionKey, fromLabel, toLabel, r.clientIP, r.requestID, r.rec.ThreatScore()) } @@ -1654,10 +1627,7 @@ func (r *wsRelay) clientToUpstream(ctx context.Context, cancel context.CancelFun // block_all=true, close the WebSocket immediately. This prevents // clean frames from flowing after escalation during long-lived connections. if decide.UpgradeAction("", r.escalationLevel(), &r.cfg.AdaptiveEnforcement) == config.ActionBlock { - sessionKey := r.clientIP - if r.agent != "" && r.agent != agentAnonymous { - sessionKey = r.agent + "|" + r.clientIP - } + sessionKey := sessionKeyFor(r.agent, r.clientIP) log.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(r.escalationLevel()), "", config.ActionBlock, "session_deny", r.clientIP, r.requestID) r.proxy.metrics.RecordAdaptiveUpgrade("", config.ActionBlock, session.EscalationLabel(r.escalationLevel())) r.terminalOnce.Do(func() { @@ -1978,20 +1948,14 @@ func (r *wsRelay) upstreamToClient(ctx context.Context, cancel context.CancelFun // On-entry de-escalation for long-lived WebSocket connections. if changed, fromLabel, toLabel := trySessionRecovery(r.rec, &r.cfg.AdaptiveEnforcement, r.proxy.metrics); changed { - sessionKey := r.clientIP - if r.agent != "" && r.agent != agentAnonymous { - sessionKey = r.agent + "|" + r.clientIP - } + sessionKey := sessionKeyFor(r.agent, r.clientIP) log.LogAdaptiveEscalation(sessionKey, fromLabel, toLabel, r.clientIP, r.requestID, r.rec.ThreatScore()) } // block_all check: if the session has escalated to a level with // block_all=true, close the WebSocket immediately. if decide.UpgradeAction("", r.escalationLevel(), &r.cfg.AdaptiveEnforcement) == config.ActionBlock { - sessionKey := r.clientIP - if r.agent != "" && r.agent != agentAnonymous { - sessionKey = r.agent + "|" + r.clientIP - } + sessionKey := sessionKeyFor(r.agent, r.clientIP) log.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(r.escalationLevel()), "", config.ActionBlock, "session_deny", r.clientIP, r.requestID) r.proxy.metrics.RecordAdaptiveUpgrade("", config.ActionBlock, session.EscalationLabel(r.escalationLevel())) r.terminalOnce.Do(func() { @@ -2202,10 +2166,7 @@ func (r *wsRelay) upstreamToClient(ctx context.Context, cancel context.CancelFun wsAction = decide.UpgradeAction(wsAction, r.escalationLevel(), &r.cfg.AdaptiveEnforcement) } if wsAction != originalWSAction { - sessionKey := r.clientIP - if r.agent != "" && r.agent != agentAnonymous { - sessionKey = r.agent + "|" + r.clientIP - } + sessionKey := sessionKeyFor(r.agent, r.clientIP) log.LogAdaptiveUpgrade(sessionKey, session.EscalationLabel(r.escalationLevel()), originalWSAction, wsAction, "response_scan", r.clientIP, r.requestID) r.proxy.metrics.RecordAdaptiveUpgrade(originalWSAction, wsAction, session.EscalationLabel(r.escalationLevel())) } @@ -2234,10 +2195,7 @@ func (r *wsRelay) upstreamToClient(ctx context.Context, cancel context.CancelFun // Exempt domains skip scoring — findings are logged but don't escalate. if !wsRespExempt { if sm := r.proxy.sessionMgrPtr.Load(); sm != nil && r.cfg.AdaptiveEnforcement.Enabled { - sessionKey := r.clientIP - if r.agent != "" && r.agent != agentAnonymous { - sessionKey = r.agent + "|" + r.clientIP - } + sessionKey := sessionKeyFor(r.agent, r.clientIP) sess := sm.GetOrCreate(sessionKey) decide.RecordSignal(sess, session.SignalStrip, decide.EscalationParams{ Threshold: r.cfg.AdaptiveEnforcement.EscalationThreshold,