diff --git a/enterprise/conductor/messages.go b/enterprise/conductor/messages.go index 07aef6ce..d7195cb4 100644 --- a/enterprise/conductor/messages.go +++ b/enterprise/conductor/messages.go @@ -63,6 +63,7 @@ var ( ErrWrongKeyPurpose = errors.New("conductor signature key_purpose mismatch") ErrThresholdRequired = errors.New("conductor signature threshold not met") ErrForbiddenLicenseField = errors.New("policy bundle contains forbidden license field") + ErrForbiddenBundleSection = errors.New("policy bundle contains a config section not permitted in a signed bundle") ErrInvalidValidityWindow = errors.New("invalid conductor validity window") ErrInvalidSequenceRange = errors.New("invalid conductor sequence range") ErrInvalidState = errors.New("invalid conductor state") @@ -81,6 +82,60 @@ var ( ErrInvalidDroppedAccounting = errors.New("invalid conductor dropped accounting") ) +// allowedPolicyBundleSections is the default-deny allowlist of top-level config +// sections a signed policy bundle may carry in its config_yaml. It contains ONLY +// enforcement-policy surfaces — what pipelock decides about a scanned request. +// +// Everything not listed is rejected so a signed bundle cannot reconfigure +// operational/infrastructure surfaces remotely: listeners, telemetry/emit, +// logging, sentry, kill switch, flight recorder, the conductor control plane +// itself, license, or mediation-envelope signing. It also rejects sections that +// mix enforcement with a local trust boundary, identity, certificate, routing, +// or OS-isolation concern — `internal`/`ssrf`/`dns`/`trusted_domains` (SSRF and +// DNS trust), `agents` (per-agent identity/credentials), `tls_interception` +// (MITM certs/passthrough), and `sandbox` (OS isolation) — until those are split +// into narrower policy-only surfaces. Keeping them operator-local means a bundle +// cannot loosen SSRF, add a trusted domain, retarget DNS, push agent identity, +// change TLS interception, or weaken sandboxing. +// +// Default-deny is deliberate: a config section added in a future release is +// automatically rejected from bundles until it is consciously added here. +var allowedPolicyBundleSections = map[string]struct{}{ + "version": {}, // schema/version metadata; harmless + "mode": {}, + "enforce": {}, + "explain_blocks": {}, + "api_allowlist": {}, + "suppress": {}, + "dlp": {}, + "canary_tokens": {}, + "response_scanning": {}, + "mcp_input_scanning": {}, + "mcp_tool_scanning": {}, + "mcp_tool_policy": {}, + "git_protection": {}, + "session_profiling": {}, + "adaptive_enforcement": {}, + "mcp_session_binding": {}, + "request_body_scanning": {}, + "request_policy": {}, + "tool_chain_detection": {}, + "cross_request_detection": {}, + "address_protection": {}, + "seed_phrase_detection": {}, + "rules": {}, + "mcp_binary_integrity": {}, + "mcp_tool_provenance": {}, + "behavioral_baseline": {}, + "airlock": {}, + "browser_shield": {}, + "media_policy": {}, + "taint": {}, + "redaction": {}, + "learn": {}, + "learn_lock": {}, +} + var forbiddenLicenseFields = map[string]struct{}{ "license_key": {}, "license_file": {}, @@ -482,6 +537,9 @@ func (b PolicyBundle) Validate() error { if err := rejectLicenseFields(b.Payload.ConfigYAML); err != nil { return err } + if err := rejectDisallowedBundleSections(b.Payload.ConfigYAML); err != nil { + return err + } for _, rb := range b.Payload.RuleBundles { if err := rb.Validate(); err != nil { return err @@ -1294,6 +1352,42 @@ func walkRejectLicenseFieldsAt(n *yaml.Node, path string) error { return nil } +// rejectDisallowedBundleSections enforces the default-deny +// allowedPolicyBundleSections allowlist over the top-level keys of a policy +// bundle's config_yaml. Any top-level section not in the allowlist is rejected. +// Only the top level is checked: the allowlist governs which config SURFACES a +// bundle may touch, not values within an allowed surface. An empty/blank +// config_yaml is left to the caller's existing non-empty check. +func rejectDisallowedBundleSections(configYAML string) error { + dec := yaml.NewDecoder(bytes.NewReader([]byte(configYAML))) + var doc yaml.Node + if err := dec.Decode(&doc); err != nil { + if errors.Is(err, io.EOF) { + return nil + } + return fmt.Errorf("%w: parse config payload: %w", ErrForbiddenBundleSection, err) + } + if err := rejectExtraYAMLDocuments(dec); err != nil { + return err + } + if len(doc.Content) == 0 { + return nil + } + root := doc.Content[0] + if root.Kind != yaml.MappingNode { + // A non-mapping top-level document carries no config sections; nothing + // to allow or reject here (other validators handle shape). + return nil + } + for i := 0; i+1 < len(root.Content); i += 2 { + key := root.Content[i].Value + if _, ok := allowedPolicyBundleSections[key]; !ok { + return fmt.Errorf("%w: %q", ErrForbiddenBundleSection, key) + } + } + return nil +} + func rejectExtraYAMLDocuments(dec *yaml.Decoder) error { var extra yaml.Node err := dec.Decode(&extra) diff --git a/enterprise/conductor/messages_test.go b/enterprise/conductor/messages_test.go index a8c17c55..83025c6b 100644 --- a/enterprise/conductor/messages_test.go +++ b/enterprise/conductor/messages_test.go @@ -313,7 +313,7 @@ func TestRemoteKillMessage_VerifySignaturesThreshold(t *testing.T) { func testPolicyBundle() PolicyBundle { payload := PolicyBundlePayload{ - ConfigYAML: "mode: strict\nagents:\n claude-code:\n mode: strict\n", + ConfigYAML: "mode: strict\nmcp_tool_policy:\n enabled: true\n", RuleBundles: []RuleBundleRef{{ Name: "official", Version: "2026.05.23", diff --git a/enterprise/conductor/policysync/poller.go b/enterprise/conductor/policysync/poller.go new file mode 100644 index 00000000..ae399860 --- /dev/null +++ b/enterprise/conductor/policysync/poller.go @@ -0,0 +1,244 @@ +//go:build enterprise + +// Licensed under the Elastic License 2.0. See enterprise/LICENSE. + +// Package policysync polls a Conductor leader for the latest signed policy +// bundle scoped to this follower and applies it through the apply boundary. +// +// It mirrors the remote-kill poller (enterprise/conductor/emergency): a single +// goroutine ticks on an interval, issues a GET against the leader over the +// shared mTLS client, and hands the decoded message to an applier. The applier +// here is the follower's policy-bundle apply boundary, which verifies the +// bundle signature, enforces monotonic versioning, and triggers a config +// reload. A rejected bundle leaves the running config untouched (fail closed) +// and is retried on the next interval. +package policysync + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "log/slog" + "net/http" + "net/url" + "sync" + "time" + + "github.com/luckyPipewrench/pipelock/enterprise/conductor" +) + +const ( + // LatestPolicyBundlePath mirrors controlplane.LatestPolicyBundlePath. It is + // duplicated rather than imported to keep the follower-side poller free of a + // compile-time dependency on the leader-side controlplane package. + LatestPolicyBundlePath = "/api/v1/conductor/policy/latest" + + defaultPollInterval = 30 * time.Second + // defaultResponseBytes caps the policy-bundle response body. A bundle wraps + // a config payload (conductor.MaxConfigYAMLBytes = 256 KiB) plus metadata, + // signatures, and rule-bundle refs; 1 MiB leaves generous headroom while + // bounding memory against a hostile or misbehaving leader. + defaultResponseBytes = 1 << 20 + + pollerUserAgent = "pipelock-conductor-policy-sync/1.0" + pollerAcceptedCT = "application/json" +) + +var ( + ErrPollerRequired = errors.New("conductor policy sync poller required") + ErrPollResponse = errors.New("invalid conductor policy bundle poll response") +) + +// HTTPDoer is the subset of *http.Client the poller needs. The runtime supplies +// the shared mTLS client; tests supply a stub. +type HTTPDoer interface { + Do(*http.Request) (*http.Response, error) +} + +// Applier applies a fetched policy bundle and returns an error if the bundle is +// rejected (bad signature, audience mismatch, version regression, reload +// failure). On error the poller leaves the running config untouched and retries +// on the next interval. +type Applier interface { + ApplyPolicyBundle(conductor.PolicyBundle) error +} + +// ApplierFunc adapts a plain function to the Applier interface. +type ApplierFunc func(conductor.PolicyBundle) error + +func (f ApplierFunc) ApplyPolicyBundle(b conductor.PolicyBundle) error { return f(b) } + +type PollerConfig struct { + BaseURL string + Client HTTPDoer + Applier Applier + PollInterval time.Duration + MaxResponseBytes int64 + Logger *slog.Logger +} + +type Poller struct { + client HTTPDoer + applier Applier + endpoint string + pollInterval time.Duration + maxResponseBytes int64 + logger *slog.Logger + + // mu guards lastETag. Run drives PollOnce from a single goroutine, but + // PollOnce is exported and may be called concurrently by tests, so the + // validator cache is mutex-guarded to stay race-clean. + mu sync.Mutex + lastETag string +} + +func NewPoller(cfg PollerConfig) (*Poller, error) { + if cfg.Client == nil { + return nil, fmt.Errorf("%w: HTTP client", ErrPollerRequired) + } + if cfg.Applier == nil { + return nil, fmt.Errorf("%w: applier", ErrPollerRequired) + } + endpoint, err := policyEndpoint(cfg.BaseURL) + if err != nil { + return nil, err + } + interval := cfg.PollInterval + if interval == 0 { + interval = defaultPollInterval + } + if interval < time.Second { + return nil, fmt.Errorf("conductor policy bundle poll interval must be >= 1s, got %s", interval) + } + maxBytes := cfg.MaxResponseBytes + if maxBytes == 0 { + maxBytes = defaultResponseBytes + } + if maxBytes <= 0 { + return nil, fmt.Errorf("conductor policy bundle max response bytes must be > 0, got %d", maxBytes) + } + return &Poller{ + client: cfg.Client, + applier: cfg.Applier, + endpoint: endpoint, + pollInterval: interval, + maxResponseBytes: maxBytes, + logger: cfg.Logger, + }, nil +} + +func (p *Poller) Run(ctx context.Context) error { + if p == nil { + return ErrPollerRequired + } + for { + if err := p.PollOnce(ctx); err != nil { + if errors.Is(err, context.Canceled) || (errors.Is(err, context.DeadlineExceeded) && ctx.Err() != nil) { + return err + } + p.logPollError(err) + } + timer := time.NewTimer(p.pollInterval) + select { + case <-ctx.Done(): + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + return ctx.Err() + case <-timer.C: + } + } +} + +func (p *Poller) PollOnce(ctx context.Context) error { + if p == nil { + return ErrPollerRequired + } + req, err := http.NewRequestWithContext(ctx, http.MethodGet, p.endpoint, nil) + if err != nil { + return fmt.Errorf("create conductor policy bundle poll request: %w", err) + } + req.Header.Set("Accept", pollerAcceptedCT) + req.Header.Set("User-Agent", pollerUserAgent) + p.mu.Lock() + etag := p.lastETag + p.mu.Unlock() + if etag != "" { + req.Header.Set("If-None-Match", etag) + } + resp, err := p.client.Do(req) + if err != nil { + return fmt.Errorf("poll conductor policy bundle: %w", err) + } + defer func() { _ = resp.Body.Close() }() + switch resp.StatusCode { + case http.StatusNotModified, http.StatusNoContent: + // 304: follower already holds the latest. 204: no bundle published for + // this follower's scope. Either way there is nothing to apply. + return nil + case http.StatusOK: + default: + return fmt.Errorf("%w: status=%d", ErrPollResponse, resp.StatusCode) + } + body, err := io.ReadAll(io.LimitReader(resp.Body, p.maxResponseBytes+1)) + if err != nil { + return fmt.Errorf("read conductor policy bundle response: %w", err) + } + if int64(len(body)) > p.maxResponseBytes { + return fmt.Errorf("%w: body exceeds %d bytes", ErrPollResponse, p.maxResponseBytes) + } + var bundle conductor.PolicyBundle + decoder := json.NewDecoder(bytes.NewReader(body)) + decoder.DisallowUnknownFields() + if err := decoder.Decode(&bundle); err != nil { + return fmt.Errorf("%w: decode: %w", ErrPollResponse, err) + } + if err := decoder.Decode(&struct{}{}); !errors.Is(err, io.EOF) { + return fmt.Errorf("%w: trailing JSON document", ErrPollResponse) + } + if err := p.applier.ApplyPolicyBundle(bundle); err != nil { + return fmt.Errorf("apply conductor policy bundle: %w", err) + } + // Advance the cached validator only after a successful apply, so a transient + // apply failure is retried on the next poll rather than being masked by a + // 304 (the leader would otherwise short-circuit the follower's recovery). + p.mu.Lock() + p.lastETag = resp.Header.Get("ETag") + p.mu.Unlock() + return nil +} + +func (p *Poller) logPollError(err error) { + if p.logger == nil { + return + } + p.logger.Warn("conductor_policy_bundle_poll_error", + slog.String("event", "conductor_policy_bundle_poll_error"), + slog.String("error", err.Error()), + ) +} + +func policyEndpoint(rawBaseURL string) (string, error) { + u, err := url.Parse(rawBaseURL) + if err != nil { + return "", fmt.Errorf("parse conductor policy bundle base URL: %w", err) + } + if u.Scheme != "https" || u.Host == "" { + return "", fmt.Errorf("conductor policy bundle base URL must be https with a host") + } + if u.User != nil || u.RawQuery != "" || u.Fragment != "" { + return "", fmt.Errorf("conductor policy bundle base URL must not include userinfo, query, or fragment") + } + if u.Path != "" && u.Path != "/" { + return "", fmt.Errorf("conductor policy bundle base URL must not include a path component") + } + u.Path = LatestPolicyBundlePath + u.RawPath = "" + return u.String(), nil +} diff --git a/enterprise/conductor/policysync/poller_test.go b/enterprise/conductor/policysync/poller_test.go new file mode 100644 index 00000000..4bfd88bf --- /dev/null +++ b/enterprise/conductor/policysync/poller_test.go @@ -0,0 +1,436 @@ +//go:build enterprise + +// Licensed under the Elastic License 2.0. See enterprise/LICENSE. + +package policysync + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "io" + "log/slog" + "net/http" + "strings" + "sync" + "testing" + "time" + + "github.com/luckyPipewrench/pipelock/enterprise/conductor" +) + +const ( + testBaseURL = "https://leader.example:8895" + testETag1 = `"a1b2c3"` + testETag2 = `"d4e5f6"` +) + +// step scripts one stub response: status code, ETag header, and body. A nil +// body with readErr set makes the response body fail on Read. +type step struct { + status int + etag string + body []byte + readErr bool +} + +// errReader fails on Read to exercise the response-body read-error path. +type errReader struct{} + +func (errReader) Read([]byte) (int, error) { return 0, errors.New("read failure") } + +// stubDoer serves a scripted sequence of steps (one per request) and records +// the requests it received so tests can assert on headers (e.g. If-None-Match). +// The response is constructed inside Do — not pre-built and stored — so the +// bodyclose linter correctly attributes the close to the code under test (the +// poller defers resp.Body.Close()). +type stubDoer struct { + mu sync.Mutex + steps []step + err error // non-nil => every Do returns this transport error + reqs []*http.Request + i int +} + +func (s *stubDoer) Do(r *http.Request) (*http.Response, error) { + s.mu.Lock() + defer s.mu.Unlock() + s.reqs = append(s.reqs, r) + if s.err != nil { + return nil, s.err + } + idx := s.i + s.i++ + if idx >= len(s.steps) { + idx = len(s.steps) - 1 + } + st := s.steps[idx] + h := http.Header{} + if st.etag != "" { + h.Set("ETag", st.etag) + } + var body io.ReadCloser + if st.readErr { + body = io.NopCloser(errReader{}) + } else { + body = io.NopCloser(strings.NewReader(string(st.body))) + } + return &http.Response{StatusCode: st.status, Header: h, Body: body, Request: r}, nil +} + +func (s *stubDoer) reqCount() int { + s.mu.Lock() + defer s.mu.Unlock() + return len(s.reqs) +} + +func (s *stubDoer) inmAt(i int) string { + s.mu.Lock() + defer s.mu.Unlock() + if i < 0 || i >= len(s.reqs) { + return "" + } + return s.reqs[i].Header.Get("If-None-Match") +} + +func (s *stubDoer) lastReq() *http.Request { + s.mu.Lock() + defer s.mu.Unlock() + if len(s.reqs) == 0 { + return nil + } + return s.reqs[len(s.reqs)-1] +} + +type stubApplier struct { + mu sync.Mutex + got []conductor.PolicyBundle + err error +} + +func (a *stubApplier) ApplyPolicyBundle(b conductor.PolicyBundle) error { + a.mu.Lock() + defer a.mu.Unlock() + a.got = append(a.got, b) + return a.err +} + +func (a *stubApplier) calls() int { + a.mu.Lock() + defer a.mu.Unlock() + return len(a.got) +} + +func mkBundleJSON(t *testing.T) []byte { + t.Helper() + raw, err := json.Marshal(conductor.PolicyBundle{BundleID: "b1", Version: 1}) + if err != nil { + t.Fatalf("marshal bundle: %v", err) + } + return raw +} + +func newTestPoller(t *testing.T, doer HTTPDoer, applier Applier) *Poller { + t.Helper() + p, err := NewPoller(PollerConfig{ + BaseURL: testBaseURL, + Client: doer, + Applier: applier, + PollInterval: time.Second, + }) + if err != nil { + t.Fatalf("NewPoller: %v", err) + } + return p +} + +func TestNewPoller_Validation(t *testing.T) { + okApplier := ApplierFunc(func(conductor.PolicyBundle) error { return nil }) + okDoer := &stubDoer{steps: []step{{status: http.StatusNoContent}}} + tests := []struct { + name string + cfg PollerConfig + wantErr error + }{ + {name: "nil client", cfg: PollerConfig{BaseURL: testBaseURL, Applier: okApplier}, wantErr: ErrPollerRequired}, + {name: "nil applier", cfg: PollerConfig{BaseURL: testBaseURL, Client: okDoer}, wantErr: ErrPollerRequired}, + {name: "interval below floor", cfg: PollerConfig{BaseURL: testBaseURL, Client: okDoer, Applier: okApplier, PollInterval: time.Millisecond}}, + {name: "negative max response bytes", cfg: PollerConfig{BaseURL: testBaseURL, Client: okDoer, Applier: okApplier, MaxResponseBytes: -1}}, + {name: "non-https base url", cfg: PollerConfig{BaseURL: "http://leader.example:8895", Client: okDoer, Applier: okApplier}}, + {name: "base url with path", cfg: PollerConfig{BaseURL: "https://leader.example:8895/extra", Client: okDoer, Applier: okApplier}}, + {name: "base url with query", cfg: PollerConfig{BaseURL: "https://leader.example:8895?x=1", Client: okDoer, Applier: okApplier}}, + {name: "base url empty host", cfg: PollerConfig{BaseURL: "https://", Client: okDoer, Applier: okApplier}}, + {name: "base url unparseable", cfg: PollerConfig{BaseURL: "://bad", Client: okDoer, Applier: okApplier}}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + p, err := NewPoller(tc.cfg) + if err == nil { + t.Fatalf("expected error, got nil poller=%v", p) + } + if tc.wantErr != nil && !errors.Is(err, tc.wantErr) { + t.Fatalf("want errors.Is %v, got %v", tc.wantErr, err) + } + }) + } +} + +func TestNewPoller_DefaultsAndEndpoint(t *testing.T) { + p, err := NewPoller(PollerConfig{ + BaseURL: testBaseURL, + Client: &stubDoer{}, + Applier: ApplierFunc(func(conductor.PolicyBundle) error { return nil }), + }) + if err != nil { + t.Fatalf("NewPoller: %v", err) + } + if p.pollInterval != defaultPollInterval { + t.Errorf("pollInterval = %s, want %s", p.pollInterval, defaultPollInterval) + } + if p.maxResponseBytes != defaultResponseBytes { + t.Errorf("maxResponseBytes = %d, want %d", p.maxResponseBytes, defaultResponseBytes) + } + if want := testBaseURL + LatestPolicyBundlePath; p.endpoint != want { + t.Errorf("endpoint = %q, want %q", p.endpoint, want) + } +} + +func TestPollOnce_StatusHandling(t *testing.T) { + tests := []struct { + name string + status int + body []byte + applierErr error + wantErr bool + wantErrType error + wantApplied int + }{ + {name: "200 applies bundle", status: http.StatusOK, body: mkBundleJSON(t), wantApplied: 1}, + {name: "204 no bundle", status: http.StatusNoContent, wantApplied: 0}, + {name: "304 not modified", status: http.StatusNotModified, wantApplied: 0}, + {name: "500 server error", status: http.StatusInternalServerError, wantErr: true, wantErrType: ErrPollResponse, wantApplied: 0}, + {name: "403 forbidden", status: http.StatusForbidden, wantErr: true, wantErrType: ErrPollResponse, wantApplied: 0}, + {name: "200 invalid json", status: http.StatusOK, body: []byte("{not json"), wantErr: true, wantErrType: ErrPollResponse, wantApplied: 0}, + {name: "200 unknown field", status: http.StatusOK, body: []byte(`{"bundle_id":"b1","totally_unknown":true}`), wantErr: true, wantErrType: ErrPollResponse, wantApplied: 0}, + // A trailing JSON document is rejected BEFORE apply — a hostile leader + // must not be able to smuggle a second payload past the strict decoder. + {name: "200 trailing document", status: http.StatusOK, body: append(mkBundleJSON(t), []byte("\n{}")...), wantErr: true, wantErrType: ErrPollResponse, wantApplied: 0}, + {name: "200 applier rejects", status: http.StatusOK, body: mkBundleJSON(t), applierErr: errors.New("bad signature"), wantErr: true, wantApplied: 1}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + doer := &stubDoer{steps: []step{{status: tc.status, etag: testETag1, body: tc.body}}} + applier := &stubApplier{err: tc.applierErr} + p := newTestPoller(t, doer, applier) + err := p.PollOnce(context.Background()) + if tc.wantErr && err == nil { + t.Fatal("expected error, got nil") + } + if !tc.wantErr && err != nil { + t.Fatalf("unexpected error: %v", err) + } + if tc.wantErrType != nil && !errors.Is(err, tc.wantErrType) { + t.Fatalf("want errors.Is %v, got %v", tc.wantErrType, err) + } + if got := applier.calls(); got != tc.wantApplied { + t.Errorf("applier calls = %d, want %d", got, tc.wantApplied) + } + }) + } +} + +func TestPollOnce_ETagAdvancesOnlyAfterSuccess(t *testing.T) { + // First poll: 200 with ETag1 -> applied, ETag cached. + // Second poll: must send If-None-Match: ETag1, and 304 skips apply. + doer := &stubDoer{steps: []step{ + {status: http.StatusOK, etag: testETag1, body: mkBundleJSON(t)}, + {status: http.StatusNotModified, etag: testETag1}, + }} + applier := &stubApplier{} + p := newTestPoller(t, doer, applier) + + if err := p.PollOnce(context.Background()); err != nil { + t.Fatalf("first poll: %v", err) + } + if got := doer.inmAt(0); got != "" { + t.Errorf("first request should not send If-None-Match, got %q", got) + } + if err := p.PollOnce(context.Background()); err != nil { + t.Fatalf("second poll: %v", err) + } + if got := doer.lastReq().Header.Get("If-None-Match"); got != testETag1 { + t.Errorf("second request If-None-Match = %q, want %q", got, testETag1) + } + if applier.calls() != 1 { + t.Errorf("applier should be called once (304 skips), got %d", applier.calls()) + } +} + +func TestPollOnce_ETagNotAdvancedWhenApplierRejects(t *testing.T) { + // Apply fails on the first poll -> ETag must NOT advance, so the second + // poll re-fetches (no If-None-Match) instead of being masked by a 304. + doer := &stubDoer{steps: []step{ + {status: http.StatusOK, etag: testETag1, body: mkBundleJSON(t)}, + {status: http.StatusOK, etag: testETag2, body: mkBundleJSON(t)}, + }} + applier := &stubApplier{err: errors.New("reject")} + p := newTestPoller(t, doer, applier) + + if err := p.PollOnce(context.Background()); err == nil { + t.Fatal("first poll: expected apply error") + } + if err := p.PollOnce(context.Background()); err == nil { + t.Fatal("second poll: expected apply error") + } + if got := doer.lastReq().Header.Get("If-None-Match"); got != "" { + t.Errorf("after a rejected apply the poller must re-fetch without If-None-Match, got %q", got) + } + if applier.calls() != 2 { + t.Errorf("applier should be retried, calls = %d, want 2", applier.calls()) + } +} + +func TestPollOnce_BodyExceedsCap(t *testing.T) { + doer := &stubDoer{steps: []step{{status: http.StatusOK, etag: testETag1, body: []byte(strings.Repeat("x", 64))}}} + applier := &stubApplier{} + p, err := NewPoller(PollerConfig{ + BaseURL: testBaseURL, + Client: doer, + Applier: applier, + PollInterval: time.Second, + MaxResponseBytes: 16, + }) + if err != nil { + t.Fatalf("NewPoller: %v", err) + } + if err := p.PollOnce(context.Background()); !errors.Is(err, ErrPollResponse) { + t.Fatalf("want ErrPollResponse, got %v", err) + } + if applier.calls() != 0 { + t.Errorf("oversized body must not be applied, calls = %d", applier.calls()) + } +} + +func TestPollOnce_BodyReadError(t *testing.T) { + doer := &stubDoer{steps: []step{{status: http.StatusOK, readErr: true}}} + applier := &stubApplier{} + p := newTestPoller(t, doer, applier) + if err := p.PollOnce(context.Background()); err == nil { + t.Fatal("expected body read error") + } + if applier.calls() != 0 { + t.Errorf("read error must not apply, calls = %d", applier.calls()) + } +} + +func TestPollOnce_TransportError(t *testing.T) { + doer := &stubDoer{err: errors.New("dial tcp: connection refused")} + applier := &stubApplier{} + p := newTestPoller(t, doer, applier) + if err := p.PollOnce(context.Background()); err == nil { + t.Fatal("expected transport error") + } + if applier.calls() != 0 { + t.Errorf("transport error must not apply, calls = %d", applier.calls()) + } +} + +func TestPollOnce_RequestHeaders(t *testing.T) { + doer := &stubDoer{steps: []step{{status: http.StatusNoContent}}} + p := newTestPoller(t, doer, &stubApplier{}) + if err := p.PollOnce(context.Background()); err != nil { + t.Fatalf("PollOnce: %v", err) + } + req := doer.lastReq() + if req.Method != http.MethodGet { + t.Errorf("method = %s, want GET", req.Method) + } + if got := req.Header.Get("Accept"); got != pollerAcceptedCT { + t.Errorf("Accept = %q, want %q", got, pollerAcceptedCT) + } + if got := req.Header.Get("User-Agent"); got != pollerUserAgent { + t.Errorf("User-Agent = %q, want %q", got, pollerUserAgent) + } + if !strings.HasSuffix(req.URL.Path, LatestPolicyBundlePath) { + t.Errorf("path = %q, want suffix %q", req.URL.Path, LatestPolicyBundlePath) + } +} + +func TestRun_StopsOnContextCancel(t *testing.T) { + doer := &stubDoer{steps: []step{{status: http.StatusNoContent}}} + p := newTestPoller(t, doer, &stubApplier{}) + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { done <- p.Run(ctx) }() + cancel() + select { + case err := <-done: + if !errors.Is(err, context.Canceled) { + t.Fatalf("Run returned %v, want context.Canceled", err) + } + case <-time.After(5 * time.Second): + t.Fatal("Run did not stop after cancel") + } +} + +func TestRun_LogsPollErrorAndContinues(t *testing.T) { + // First poll fails (500); the poller logs and keeps looping. Exercises the + // logPollError continue-branch inside Run. + doer := &stubDoer{steps: []step{ + {status: http.StatusInternalServerError}, + {status: http.StatusNoContent}, + }} + p, err := NewPoller(PollerConfig{ + BaseURL: testBaseURL, + Client: doer, + Applier: &stubApplier{}, + PollInterval: time.Second, + Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + }) + if err != nil { + t.Fatalf("NewPoller: %v", err) + } + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { done <- p.Run(ctx) }() + deadline := time.After(5 * time.Second) + for doer.reqCount() < 1 { + select { + case <-deadline: + t.Fatal("no poll occurred") + case <-time.After(5 * time.Millisecond): + } + } + cancel() + select { + case err := <-done: + if !errors.Is(err, context.Canceled) { + t.Fatalf("Run = %v, want context.Canceled", err) + } + case <-time.After(5 * time.Second): + t.Fatal("Run did not stop") + } +} + +func TestRun_NilPoller(t *testing.T) { + var p *Poller + if err := p.Run(context.Background()); !errors.Is(err, ErrPollerRequired) { + t.Fatalf("nil Run = %v, want ErrPollerRequired", err) + } + if err := p.PollOnce(context.Background()); !errors.Is(err, ErrPollerRequired) { + t.Fatalf("nil PollOnce = %v, want ErrPollerRequired", err) + } +} + +func TestLogPollError(t *testing.T) { + var logs bytes.Buffer + p := &Poller{logger: slog.New(slog.NewJSONHandler(&logs, nil))} + p.logPollError(errors.New("boom")) + got := logs.String() + if !strings.Contains(got, "conductor_policy_bundle_poll_error") || !strings.Contains(got, "boom") { + t.Fatalf("logs = %s, want poll error event", got) + } + // A nil logger must be a safe no-op. + (&Poller{}).logPollError(errors.New("ignored")) +} diff --git a/internal/cli/runtime/conductor.go b/internal/cli/runtime/conductor.go index a3422a3f..a1add8ca 100644 --- a/internal/cli/runtime/conductor.go +++ b/internal/cli/runtime/conductor.go @@ -24,6 +24,7 @@ import ( "github.com/luckyPipewrench/pipelock/enterprise/conductor/applycache" "github.com/luckyPipewrench/pipelock/enterprise/conductor/auditbatcher" "github.com/luckyPipewrench/pipelock/enterprise/conductor/emergency" + "github.com/luckyPipewrench/pipelock/enterprise/conductor/policysync" "github.com/luckyPipewrench/pipelock/internal/cliutil" "github.com/luckyPipewrench/pipelock/internal/config" "github.com/luckyPipewrench/pipelock/internal/killswitch" @@ -188,6 +189,60 @@ func buildConductorRemoteKillPoller(cfg *config.Config, ks emergency.KillSwitchS }) } +// buildConductorBundlePoller wires the follower-side policy-bundle poller. It is +// a method so the applier closure can call s.ApplyConductorPolicyBundle, which +// owns the verify -> reload -> activate boundary. Unlike the remote-kill poller, +// the bundle poller ALWAYS builds the real trust resolver: applying a policy +// bundle mutates the running config, so it must be signature-verified regardless +// of honor_remote_kill_switch. A missing/unloadable roster fails closed here. +func (s *Server) buildConductorBundlePoller(cfg *config.Config, logWriter io.Writer) (*policysync.Poller, error) { + if cfg == nil || !cfg.Conductor.Enabled { + return nil, nil + } + client, err := newConductorMTLSClient(cfg.Conductor) + if err != nil { + return nil, err + } + resolver, err := buildConductorTrustResolver(cfg.Conductor, time.Now) + if err != nil { + return nil, fmt.Errorf("building conductor policy bundle trust resolver: %w", err) + } + interval, err := time.ParseDuration(cfg.Conductor.PollInterval) + if err != nil { + return nil, fmt.Errorf("parsing conductor policy bundle poll interval: %w", err) + } + if logWriter == nil { + logWriter = io.Discard + } + logger := slog.New(slog.NewJSONHandler(logWriter, &slog.HandlerOptions{Level: slog.LevelInfo})). + With("service", "pipelock", "component", "conductor_policy_bundle") + applier := policysync.ApplierFunc(func(bundle conductor.PolicyBundle) error { + _, applyErr := s.ApplyConductorPolicyBundle(bundle, ConductorApplyOptions{Resolver: resolver}) + return applyErr + }) + return policysync.NewPoller(policysync.PollerConfig{ + BaseURL: cfg.Conductor.ConductorURL, + Client: client, + Applier: applier, + PollInterval: interval, + Logger: logger, + }) +} + +// initConductorBundlePoller stores the policy-bundle poller on the Server so the +// lifecycle can launch its Run loop alongside the remote-kill poller and audit +// transport. Mirrors initConductorRemoteKill. +func (s *Server) initConductorBundlePoller(cfg *config.Config, stderr io.Writer) error { + poller, err := s.buildConductorBundlePoller(cfg, stderr) + if err != nil { + return err + } + if poller != nil { + s.conductorBundle = poller + } + return nil +} + func buildConductorTrustResolver(cfg config.Conductor, now func() time.Time) (conductor.SignatureKeyResolver, error) { if now == nil { now = time.Now diff --git a/internal/cli/runtime/conductor_license_test.go b/internal/cli/runtime/conductor_license_test.go index 27020a8c..7f56c5bb 100644 --- a/internal/cli/runtime/conductor_license_test.go +++ b/internal/cli/runtime/conductor_license_test.go @@ -144,6 +144,11 @@ func conductorLicenseGateConfigYAML(t *testing.T) string { " org_id: o\n" + " fleet_id: f\n" + " instance_id: i\n" + + // A format-valid pinned fingerprint is required whenever conductor.enabled + // (independent of honor_remote_kill_switch). These gate tests fail at the + // license / enterprise-build gate before any roster file is loaded, so a + // well-formed placeholder fingerprint is sufficient to pass config validation. + " trust_roster_root_fingerprint: sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\n" + " server_ca_file: " + strconv.Quote(caPath) + "\n" + " client_cert_path: " + strconv.Quote(clientCertPath) + "\n" + " client_key_path: " + strconv.Quote(clientKeyPath) + "\n" + diff --git a/internal/cli/runtime/conductor_stub.go b/internal/cli/runtime/conductor_stub.go index 65fb04c6..3a5b8958 100644 --- a/internal/cli/runtime/conductor_stub.go +++ b/internal/cli/runtime/conductor_stub.go @@ -57,3 +57,13 @@ func (s *Server) initConductorProducer(cfg *config.Config, _ *metrics.Metrics, _ } return nil } + +// initConductorBundlePoller is a no-op in the Apache-only build. See +// initConductorApplyAndAudit for the rationale. +func (s *Server) initConductorBundlePoller(cfg *config.Config, _ io.Writer) error { + s.touchConductorCoreFields() + if cfg != nil && cfg.Conductor.Enabled { + return errConductorEnterpriseBuildRequired + } + return nil +} diff --git a/internal/cli/runtime/conductor_test.go b/internal/cli/runtime/conductor_test.go index 026a8758..eb359a17 100644 --- a/internal/cli/runtime/conductor_test.go +++ b/internal/cli/runtime/conductor_test.go @@ -293,6 +293,132 @@ func TestBuildConductorRemoteKillPollerHonorsDisableWithoutRoster(t *testing.T) } } +// TestBuildConductorBundlePollerRejectsBadRosterEvenWithHonorFalse proves the +// policy-bundle poller fails closed when the trust roster cannot be loaded, +// REGARDLESS of honor_remote_kill_switch. Unlike the remote-kill poller (which +// installs a reject-all resolver and keeps running when honor=false so it can +// log visible rejections), the bundle poller must have a real verified trust +// root before it can apply any signed bundle — so a missing/unreadable roster +// is a hard startup error. +func TestBuildConductorBundlePollerRejectsBadRosterEvenWithHonorFalse(t *testing.T) { + dir := t.TempDir() + clientPEM, clientKeyPEM := testTLSClientCert(t) + caPath := filepath.Join(dir, "boss-ca.pem") + clientCertPath := filepath.Join(dir, "client.crt") + clientKeyPath := filepath.Join(dir, "client.key") + writePrivateTestFile(t, caPath, clientPEM) + writePrivateTestFile(t, clientCertPath, clientPEM) + writePrivateTestFile(t, clientKeyPath, clientKeyPEM) + + s := &Server{} + _, err := s.buildConductorBundlePoller(&config.Config{ + Conductor: config.Conductor{ + Enabled: true, + ConductorURL: "https://conductor.example", + OrgID: "org-main", + FleetID: "prod", + InstanceID: "pl-prod-1", + TrustRosterPath: filepath.Join(dir, "missing-roster.json"), + TrustRosterRootFingerprint: strings.Repeat("a", 64), + ServerCAFile: caPath, + ClientCertPath: clientCertPath, + ClientKeyPath: clientKeyPath, + BundleCacheDir: filepath.Join(dir, "bundles"), + DurableAuditQueueDir: filepath.Join(dir, "audit-queue"), + PollInterval: "30s", + HonorRemoteKillSwitch: false, // the crux: honor=false must NOT skip roster verification + }, + }, io.Discard) + if err == nil { + t.Fatal("buildConductorBundlePoller() with honor=false + missing roster: want error, got nil") + } + if !strings.Contains(err.Error(), "trust resolver") && !strings.Contains(err.Error(), "trust roster") { + t.Fatalf("error = %v, want trust roster/resolver failure", err) + } +} + +// TestBuildConductorBundlePollerDisabled confirms the poller is a no-op (nil, +// nil) when conductor is not enabled. +func TestBuildConductorBundlePollerDisabled(t *testing.T) { + s := &Server{} + poller, err := s.buildConductorBundlePoller(&config.Config{Conductor: config.Conductor{Enabled: false}}, io.Discard) + if err != nil { + t.Fatalf("disabled buildConductorBundlePoller() error = %v", err) + } + if poller != nil { + t.Fatal("disabled buildConductorBundlePoller() poller = non-nil, want nil") + } +} + +// TestBuildConductorBundlePollerErrorPaths covers the remaining fail-closed +// branches: an unreadable mTLS client certificate, and (with valid mTLS + a real +// signed roster) an unparseable poll interval. The trust-resolver branch is +// covered by TestBuildConductorBundlePollerRejectsBadRosterEvenWithHonorFalse. +func TestBuildConductorBundlePollerErrorPaths(t *testing.T) { + dir := t.TempDir() + clientPEM, clientKeyPEM := testTLSClientCert(t) + caPath := filepath.Join(dir, "boss-ca.pem") + clientCertPath := filepath.Join(dir, "client.crt") + clientKeyPath := filepath.Join(dir, "client.key") + rosterPath := filepath.Join(dir, "trust-roster.json") + bundlePub, _, err := ed25519.GenerateKey(rand.Reader) + if err != nil { + t.Fatalf("GenerateKey: %v", err) + } + rootFingerprint := writeRuntimeTrustRoster(t, rosterPath, bundlePub, "policy-signer-1", signing.PurposePolicyBundleSigning) + writePrivateTestFile(t, caPath, clientPEM) + writePrivateTestFile(t, clientCertPath, clientPEM) + writePrivateTestFile(t, clientKeyPath, clientKeyPEM) + + base := config.Conductor{ + Enabled: true, + ConductorURL: "https://conductor.example", + OrgID: "org-main", + FleetID: "prod", + InstanceID: "pl-prod-1", + TrustRosterPath: rosterPath, + TrustRosterRootFingerprint: rootFingerprint, + ServerCAFile: caPath, + ClientCertPath: clientCertPath, + ClientKeyPath: clientKeyPath, + BundleCacheDir: filepath.Join(dir, "bundles"), + DurableAuditQueueDir: filepath.Join(dir, "audit-queue"), + PollInterval: "30s", + HonorRemoteKillSwitch: false, + } + + t.Run("mtls_client_error", func(t *testing.T) { + cfg := base + cfg.ClientCertPath = filepath.Join(dir, "missing-client.crt") + s := &Server{} + if _, err := s.buildConductorBundlePoller(&config.Config{Conductor: cfg}, io.Discard); err == nil || + !strings.Contains(err.Error(), "mTLS client") { + t.Fatalf("error = %v, want mTLS client failure", err) + } + }) + + t.Run("poll_interval_error", func(t *testing.T) { + cfg := base + cfg.PollInterval = "not-a-duration" + s := &Server{} + if _, err := s.buildConductorBundlePoller(&config.Config{Conductor: cfg}, io.Discard); err == nil || + !strings.Contains(err.Error(), "parsing conductor policy bundle poll interval") { + t.Fatalf("error = %v, want poll interval parse failure", err) + } + }) + + t.Run("valid_builds_poller", func(t *testing.T) { + s := &Server{} + poller, err := s.buildConductorBundlePoller(&config.Config{Conductor: base}, nil) + if err != nil { + t.Fatalf("valid config: %v", err) + } + if poller == nil { + t.Fatal("poller = nil, want a poller for enabled conductor") + } + }) +} + func TestBuildConductorRemoteKillPollerRejectsInvalidConfig(t *testing.T) { dir := t.TempDir() clientPEM, clientKeyPEM := testTLSClientCert(t) @@ -438,18 +564,27 @@ func TestBuildConductorApplyCacheRejectsInvalidDir(t *testing.T) { } } +// TestNewServer_WiresConductorBundlePoller proves the policy-bundle poller is +// constructed and stored on the Server when conductor.enabled with a valid +// signed roster, so server_lifecycle launches its poll loop alongside the audit +// transport and remote-kill poller. +func TestNewServer_WiresConductorBundlePoller(t *testing.T) { + s, _ := newConductorApplyTestServer(t) + if s.conductorBundle == nil { + t.Fatal("conductor policy-bundle poller should be wired when conductor.enabled") + } +} + func TestApplyConductorPolicyBundleReloadsAndActivates(t *testing.T) { - s, signer, recorderKeyPath, recorderDir := newConductorApplyTestServer(t) + s, signer := newConductorApplyTestServer(t) + // Enforcement-only bundle: policy bundles may carry only enforcement-policy + // sections (default-deny allowlist), so flight_recorder/conductor/etc. are + // NOT included here. The follower's existing flight_recorder + conductor + // config must survive the reload for the apply to succeed. bundle := signedRuntimePolicyBundle(t, signer, "bundle-1", 1, "", strings.Join([]string{ "mode: strict", "api_allowlist:", " - api.example.com", - "flight_recorder:", - " enabled: true", - " dir: " + strconv.Quote(recorderDir), - " checkpoint_interval: 1", - " sign_checkpoints: true", - " signing_key_path: " + strconv.Quote(recorderKeyPath), "", }, "\n")) @@ -521,7 +656,7 @@ func (s runtimePolicySigner) resolver() conductor.SignatureKeyResolver { } } -func newConductorApplyTestServer(t *testing.T) (*Server, runtimePolicySigner, string, string) { +func newConductorApplyTestServer(t *testing.T) (*Server, runtimePolicySigner) { t.Helper() // conductor.enabled triggers the fleet-license gate; install a real // Enterprise token for the test so the production gate path is exercised. @@ -548,7 +683,11 @@ func newConductorApplyTestServer(t *testing.T) (*Server, runtimePolicySigner, st caPath := filepath.Join(tmp, "boss-ca.pem") clientCertPath := filepath.Join(tmp, "client.crt") clientKeyPath := filepath.Join(tmp, "client.key") - writePrivateTestFile(t, trustPath, []byte(`{"keys":[]}`)) + // A real signed roster is mandatory whenever conductor.enabled, independent + // of honor_remote_kill_switch: the policy-bundle poller must verify signed + // bundles against a pinned trust root before applying them. + bundleSigner := newRuntimePolicySigner(t) + rootFingerprint := writeRuntimeTrustRoster(t, trustPath, bundleSigner.pub, bundleSigner.id, signing.PurposePolicyBundleSigning) writePrivateTestFile(t, caPath, clientPEM) writePrivateTestFile(t, clientCertPath, clientPEM) writePrivateTestFile(t, clientKeyPath, clientKeyPEM) @@ -569,6 +708,7 @@ func newConductorApplyTestServer(t *testing.T) (*Server, runtimePolicySigner, st " fleet_id: prod", " instance_id: pl-prod-1", " trust_roster_path: " + strconv.Quote(trustPath), + " trust_roster_root_fingerprint: " + strconv.Quote(rootFingerprint), " server_ca_file: " + strconv.Quote(caPath), " client_cert_path: " + strconv.Quote(clientCertPath), " client_key_path: " + strconv.Quote(clientKeyPath), @@ -589,7 +729,7 @@ func newConductorApplyTestServer(t *testing.T) (*Server, runtimePolicySigner, st if server.conductorApply == nil { t.Fatal("conductor apply cache should be initialized") } - return server, newRuntimePolicySigner(t), keyPath, recorderDir + return server, bundleSigner } func signedRuntimePolicyBundle(t *testing.T, signer runtimePolicySigner, id string, version uint64, previousHash, configYAML string) conductor.PolicyBundle { diff --git a/internal/cli/runtime/server.go b/internal/cli/runtime/server.go index 05f49d2f..85882e5f 100644 --- a/internal/cli/runtime/server.go +++ b/internal/cli/runtime/server.go @@ -112,6 +112,7 @@ type Server struct { conductorAuditQueue any conductorAudit conductorRunner conductorRemoteKill conductorRunner + conductorBundle conductorRunner conductorProducer conductorCloser approver *hitl.Approver @@ -335,6 +336,10 @@ func NewServer(opts ServerOpts) (*Server, error) { s.cleanup() return nil, err } + if err := s.initConductorBundlePoller(cfg, opts.Stderr); err != nil { + s.cleanup() + return nil, err + } var proxyOpts []proxy.Option s.hasApprover = cfg.ResponseScanning.Action == config.ActionAsk diff --git a/internal/cli/runtime/server_conductor_test.go b/internal/cli/runtime/server_conductor_test.go index a46c0a11..de01d7ef 100644 --- a/internal/cli/runtime/server_conductor_test.go +++ b/internal/cli/runtime/server_conductor_test.go @@ -44,7 +44,11 @@ func TestNewServer_ConductorAuditProducerFromConfig(t *testing.T) { caPath := filepath.Join(tmp, "boss-ca.pem") clientCertPath := filepath.Join(tmp, "client.crt") clientKeyPath := filepath.Join(tmp, "client.key") - writePrivateTestFile(t, trustPath, []byte(`{"keys":[]}`)) + // conductor.enabled requires a real signed roster + pinned fingerprint even + // with honor_remote_kill_switch:false — the policy-bundle poller verifies + // signed bundles against the pinned trust root. + bundleSigner := newRuntimePolicySigner(t) + rootFingerprint := writeRuntimeTrustRoster(t, trustPath, bundleSigner.pub, bundleSigner.id, signing.PurposePolicyBundleSigning) writePrivateTestFile(t, caPath, clientPEM) writePrivateTestFile(t, clientCertPath, clientPEM) writePrivateTestFile(t, clientKeyPath, clientKeyPEM) @@ -64,6 +68,7 @@ func TestNewServer_ConductorAuditProducerFromConfig(t *testing.T) { " fleet_id: prod", " instance_id: pl-prod-1", " trust_roster_path: " + strconv.Quote(trustPath), + " trust_roster_root_fingerprint: " + strconv.Quote(rootFingerprint), " server_ca_file: " + strconv.Quote(caPath), " client_cert_path: " + strconv.Quote(clientCertPath), " client_key_path: " + strconv.Quote(clientKeyPath), @@ -126,6 +131,13 @@ func (serverRemoteKillNoopClient) Do(req *http.Request) (*http.Response, error) }, nil } +type serverConductorBlockingRunner struct{} + +func (serverConductorBlockingRunner) Run(ctx context.Context) error { + <-ctx.Done() + return ctx.Err() +} + func TestServer_StartRunsConductorRemoteKillPoller(t *testing.T) { s, buf := newTestServer(t, func(o *ServerOpts) { o.Listen = newServerTestFreePort(t) @@ -165,3 +177,34 @@ func TestServer_StartRunsConductorRemoteKillPoller(t *testing.T) { t.Fatal("Start did not return within 5s of Shutdown") } } + +func TestServer_StartRunsConductorBundlePoller(t *testing.T) { + s, buf := newTestServer(t, func(o *ServerOpts) { + o.Listen = newServerTestFreePort(t) + o.ListenChanged = true + }) + s.conductorBundle = serverConductorBlockingRunner{} + s.cfg.Conductor.ConductorURL = "https://conductor.example" + + errCh := make(chan error, 1) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + errCh <- s.Start(ctx) + }() + + waitForServerCancel(t, s) + waitForServerOutput(t, buf, "Conductor: policy bundle polling enabled -> https://conductor.example") + + if err := s.Shutdown(context.Background()); err != nil { + t.Fatalf("Shutdown: %v", err) + } + select { + case err := <-errCh: + if err != nil { + t.Fatalf("Start returned error after Shutdown: %v", err) + } + case <-time.After(5 * time.Second): + t.Fatal("Start did not return within 5s of Shutdown") + } +} diff --git a/internal/cli/runtime/server_lifecycle.go b/internal/cli/runtime/server_lifecycle.go index df40256a..ca5436fc 100644 --- a/internal/cli/runtime/server_lifecycle.go +++ b/internal/cli/runtime/server_lifecycle.go @@ -206,6 +206,9 @@ func (s *Server) Start(ctx context.Context) error { if s.conductorRemoteKill != nil { _, _ = fmt.Fprintf(s.opts.Stderr, " Conductor: remote kill polling enabled -> %s\n", RedactEndpoint(cfg.Conductor.ConductorURL)) } + if s.conductorBundle != nil { + _, _ = fmt.Fprintf(s.opts.Stderr, " Conductor: policy bundle polling enabled -> %s\n", RedactEndpoint(cfg.Conductor.ConductorURL)) + } if s.opts.CaptureOutput != "" { if s.opts.CaptureDuration > 0 { _, _ = fmt.Fprintf(s.opts.Stderr, " Capture: %s (duration: %s)\n", s.opts.CaptureOutput, s.opts.CaptureDuration) @@ -225,7 +228,7 @@ func (s *Server) Start(ctx context.Context) error { } var conductorWG sync.WaitGroup - if s.conductorAudit != nil || s.conductorRemoteKill != nil { + if s.conductorAudit != nil || s.conductorRemoteKill != nil || s.conductorBundle != nil { if s.conductorRemoteKill != nil { conductorWG.Add(1) go func() { @@ -259,7 +262,24 @@ func (s *Server) Start(ctx context.Context) error { } }() } - if s.conductorAudit != nil || s.conductorRemoteKill != nil { + if s.conductorBundle != nil { + conductorWG.Add(1) + go func() { + defer conductorWG.Done() + defer func() { + if r := recover(); r != nil { + _, _ = fmt.Fprintf(s.opts.Stderr, "pipelock: conductor policy bundle poller panic: %v\n", r) + cancel() + } + }() + if err := s.conductorBundle.Run(ctx); err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { + s.logger.LogError(audit.NewResourceLogContext("conductor_policy_bundle_poller", cfg.Conductor.ConductorURL), err) + _, _ = fmt.Fprintf(s.opts.Stderr, "pipelock: conductor policy bundle poller stopped: %v\n", err) + cancel() + } + }() + } + if s.conductorAudit != nil || s.conductorRemoteKill != nil || s.conductorBundle != nil { defer func() { cancel() conductorWG.Wait() diff --git a/internal/cli/runtime/server_reload.go b/internal/cli/runtime/server_reload.go index b9371182..f1176692 100644 --- a/internal/cli/runtime/server_reload.go +++ b/internal/cli/runtime/server_reload.go @@ -89,18 +89,28 @@ func (s *Server) Reload(newCfg *config.Config) (err error) { s.logger.LogConfigReload("ignored", "conductor settings restart-only", attemptedHash) newCfg.Conductor = oldCfg.Conductor } - // Block signing key rotation via reload. The receipt chain - // state is anchored to the current signing key; rotation - // mid-chain causes tail-signature verification to fail on - // resume, which in turn drops receipt persistence for every - // subsequent action. Proper chain rollover with a key-rotation - // marker is tracked for v2.2.1. Until then, preserve the old - // key and warn — operators must restart pipelock to rotate the - // receipt signing key. - if oldCfg.FlightRecorder.SigningKeyPath != newCfg.FlightRecorder.SigningKeyPath { - _, _ = fmt.Fprintf(s.opts.Stderr, "WARNING: config reload: flight_recorder.signing_key_path changed from %q to %q — receipt chain cannot rotate at runtime, ignoring (restart required)\n", - oldCfg.FlightRecorder.SigningKeyPath, newCfg.FlightRecorder.SigningKeyPath) - newCfg.FlightRecorder.SigningKeyPath = oldCfg.FlightRecorder.SigningKeyPath + // Block flight_recorder changes via reload. The recorder (and its + // receipt/audit chain) is built once at Start; reload swaps config and + // scanner but never rebuilds the recorder, so any flight_recorder change + // would leave the live config disagreeing with the running recorder. + // Signing-key rotation is the sharpest case — the receipt chain is + // anchored to the current key, and rotating mid-chain breaks tail- + // signature verification on resume — but every field is restart-only for + // the same build-once reason. Preserve the whole block and warn. + // + // This also keeps Conductor policy-bundle apply working: a signed bundle + // carries enforcement-only config (flight_recorder is not an allowlisted + // bundle section), so the bundle's loaded config omits flight_recorder. + // Preserving the follower's existing block means conductor.enabled — which + // requires a signed flight recorder — still validates after the apply. + if !reflect.DeepEqual(oldCfg.FlightRecorder, newCfg.FlightRecorder) { + if oldCfg.FlightRecorder.SigningKeyPath != newCfg.FlightRecorder.SigningKeyPath { + _, _ = fmt.Fprintf(s.opts.Stderr, "WARNING: config reload: flight_recorder.signing_key_path changed from %q to %q — receipt chain cannot rotate at runtime, ignoring (restart required)\n", + oldCfg.FlightRecorder.SigningKeyPath, newCfg.FlightRecorder.SigningKeyPath) + } else { + _, _ = fmt.Fprintf(s.opts.Stderr, "WARNING: config reload: flight_recorder settings changed — recorder is built at startup and cannot rebind at runtime, ignoring (restart required)\n") + } + newCfg.FlightRecorder = oldCfg.FlightRecorder } // Block file_sentry changes via reload. The watcher is built // once at Start from the startup snapshot; reloading would diff --git a/internal/config/conductor.go b/internal/config/conductor.go index d7a19f00..d61a9864 100644 --- a/internal/config/conductor.go +++ b/internal/config/conductor.go @@ -74,13 +74,18 @@ func (c *Config) validateConductor(warnings *[]Warning) error { if strings.TrimSpace(c.FlightRecorder.SigningKeyPath) == "" { return fmt.Errorf("flight_recorder.signing_key_path required when conductor.enabled is true") } - if cfg.HonorRemoteKillSwitch { - if strings.TrimSpace(cfg.TrustRosterRootFingerprint) == "" { - return fmt.Errorf("conductor.trust_roster_root_fingerprint required when conductor.enabled and honor_remote_kill_switch are true") - } - if _, _, err := signing.ParseFingerprint(cfg.TrustRosterRootFingerprint); err != nil { - return fmt.Errorf("conductor.trust_roster_root_fingerprint: %w", err) - } + // A pinned trust-roster root fingerprint is mandatory whenever conductor is + // enabled, INDEPENDENT of honor_remote_kill_switch. The follower verifies + // signed policy bundles (and, when honored, remote-kill messages) against + // the roster rooted at this fingerprint; without it the follower could apply + // unverified signed control-plane input. honor_remote_kill_switch only + // governs whether remote-kill STATE is applied, not whether trust material + // is required. + if strings.TrimSpace(cfg.TrustRosterRootFingerprint) == "" { + return fmt.Errorf("conductor.trust_roster_root_fingerprint required when conductor.enabled is true") + } + if _, _, err := signing.ParseFingerprint(cfg.TrustRosterRootFingerprint); err != nil { + return fmt.Errorf("conductor.trust_roster_root_fingerprint: %w", err) } for field, value := range map[string]string{ "conductor.trust_roster_path": cfg.TrustRosterPath, diff --git a/internal/config/conductor_test.go b/internal/config/conductor_test.go index 0c584da2..500865a0 100644 --- a/internal/config/conductor_test.go +++ b/internal/config/conductor_test.go @@ -61,6 +61,56 @@ func TestValidateConductor_Enabled(t *testing.T) { } } +// TestValidateConductor_RequiresFingerprintRegardlessOfHonor locks in the +// contract that a pinned trust-roster root fingerprint is mandatory whenever +// conductor.enabled, INDEPENDENT of honor_remote_kill_switch. The honor flag +// only governs whether remote-kill STATE is applied; it does not relax the +// trust-material requirement, because the policy-bundle poller verifies signed +// bundles against the pinned root even when remote kill is not honored. +func TestValidateConductor_RequiresFingerprintRegardlessOfHonor(t *testing.T) { + tests := []struct { + name string + honor bool + fingerprint string + want string + }{ + {name: "honor_false_missing_fingerprint", honor: false, fingerprint: "", want: "conductor.trust_roster_root_fingerprint required"}, + {name: "honor_true_missing_fingerprint", honor: true, fingerprint: "", want: "conductor.trust_roster_root_fingerprint required"}, + {name: "honor_false_bad_fingerprint", honor: false, fingerprint: "bad", want: "conductor.trust_roster_root_fingerprint"}, + {name: "honor_true_bad_fingerprint", honor: true, fingerprint: "bad", want: "conductor.trust_roster_root_fingerprint"}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + cfg := Defaults() + conductor := validConductorConfig(t) + conductor.HonorRemoteKillSwitch = tc.honor + conductor.TrustRosterRootFingerprint = tc.fingerprint + cfg.Conductor = conductor + configureConductorRecorder(t, cfg) + + err := cfg.Validate() + if err == nil || !strings.Contains(err.Error(), tc.want) { + t.Fatalf("Validate() = %v, want substring %q", err, tc.want) + } + }) + } +} + +// TestValidateConductor_AcceptsHonorFalseWithFingerprint is the positive +// counterpart: honor_remote_kill_switch=false is a valid configuration as long +// as the pinned fingerprint is present (audit + policy-sync still participate). +func TestValidateConductor_AcceptsHonorFalseWithFingerprint(t *testing.T) { + cfg := Defaults() + conductor := validConductorConfig(t) + conductor.HonorRemoteKillSwitch = false + cfg.Conductor = conductor + configureConductorRecorder(t, cfg) + + if _, err := cfg.ValidateWithWarnings(); err != nil { + t.Fatalf("ValidateWithWarnings() with honor=false + fingerprint should pass, got %v", err) + } +} + func TestValidateConductor_RequiresSignedFlightRecorder(t *testing.T) { tests := []struct { name string diff --git a/internal/license/crl_test.go b/internal/license/crl_test.go index 6b4206d6..74659be6 100644 --- a/internal/license/crl_test.go +++ b/internal/license/crl_test.go @@ -18,7 +18,11 @@ import ( func TestSignParseAndVerifyCRL(t *testing.T) { pub, priv := testKeyPair(t) - now := time.Date(2026, 5, 23, 12, 0, 0, 0, time.UTC) + // Use real time, not a fixed date: testCRL signs via SignCRL, which validates + // the payload's expiry against time.Now(). A hard-coded date makes the 7-day + // CRL expire in wall-clock time and time-bombs the test (the sibling CRL tests + // all use time.Now() for this reason). + now := time.Now().UTC() crl := testCRL(t, priv, now, "lic_revoked") data, err := json.Marshal(crl) diff --git a/internal/sentry/client.go b/internal/sentry/client.go index 5ade4f98..2209a8b5 100644 --- a/internal/sentry/client.go +++ b/internal/sentry/client.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" "strings" + "syscall" "time" "github.com/getsentry/sentry-go" @@ -102,6 +103,10 @@ func initClient(cfg *config.Config, version string, transport sentry.Transport) // CaptureError sends an error event to Sentry (scrubbed by BeforeSend). // context.Canceled is dropped because it signals normal shutdown propagation // (SIGINT, parent exit, session end), not a failure worth paging on. +// Expected operational errors (e.g. a listener bind hitting EADDRINUSE on a +// restart race or double-start) are also dropped: they are misconfiguration or +// environment conditions surfaced to stderr and a non-zero exit, not code +// defects worth a Sentry alert. func (c *Client) CaptureError(err error) { if c == nil || !c.enabled { return @@ -109,9 +114,22 @@ func (c *Client) CaptureError(err error) { if errors.Is(err, context.Canceled) { return } + if isExpectedOperationalError(err) { + return + } sentry.CaptureException(err) } +// isExpectedOperationalError reports whether err is an environment/operator +// condition that should not page Sentry. Listener bind conflicts (EADDRINUSE) +// are the canonical case: a port already in use is an operational state, not a +// pipelock bug. Kept narrow on purpose — only clearly-operational syscall +// conditions belong here, so genuine listen failures (and everything else) +// still report. +func isExpectedOperationalError(err error) bool { + return errors.Is(err, syscall.EADDRINUSE) +} + // CaptureMessage sends a message event to Sentry (scrubbed by BeforeSend). func (c *Client) CaptureMessage(msg string) { if c == nil || !c.enabled {