Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions enterprise/conductor/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ var (
ErrWrongKeyPurpose = errors.New("conductor signature key_purpose mismatch")
ErrThresholdRequired = errors.New("conductor signature threshold not met")
ErrForbiddenLicenseField = errors.New("policy bundle contains forbidden license field")
ErrForbiddenBundleSection = errors.New("policy bundle contains a config section not permitted in a signed bundle")
ErrInvalidValidityWindow = errors.New("invalid conductor validity window")
ErrInvalidSequenceRange = errors.New("invalid conductor sequence range")
ErrInvalidState = errors.New("invalid conductor state")
Expand All @@ -81,6 +82,60 @@ var (
ErrInvalidDroppedAccounting = errors.New("invalid conductor dropped accounting")
)

// allowedPolicyBundleSections is the default-deny allowlist of top-level config
// sections a signed policy bundle may carry in its config_yaml. It contains ONLY
// enforcement-policy surfaces — what pipelock decides about a scanned request.
//
// Everything not listed is rejected so a signed bundle cannot reconfigure
// operational/infrastructure surfaces remotely: listeners, telemetry/emit,
// logging, sentry, kill switch, flight recorder, the conductor control plane
// itself, license, or mediation-envelope signing. It also rejects sections that
// mix enforcement with a local trust boundary, identity, certificate, routing,
// or OS-isolation concern — `internal`/`ssrf`/`dns`/`trusted_domains` (SSRF and
// DNS trust), `agents` (per-agent identity/credentials), `tls_interception`
// (MITM certs/passthrough), and `sandbox` (OS isolation) — until those are split
// into narrower policy-only surfaces. Keeping them operator-local means a bundle
// cannot loosen SSRF, add a trusted domain, retarget DNS, push agent identity,
// change TLS interception, or weaken sandboxing.
//
// Default-deny is deliberate: a config section added in a future release is
// automatically rejected from bundles until it is consciously added here.
var allowedPolicyBundleSections = map[string]struct{}{
"version": {}, // schema/version metadata; harmless
"mode": {},
"enforce": {},
"explain_blocks": {},
"api_allowlist": {},
"suppress": {},
"dlp": {},
"canary_tokens": {},
"response_scanning": {},
"mcp_input_scanning": {},
"mcp_tool_scanning": {},
"mcp_tool_policy": {},
"git_protection": {},
"session_profiling": {},
"adaptive_enforcement": {},
"mcp_session_binding": {},
"request_body_scanning": {},
"request_policy": {},
"tool_chain_detection": {},
"cross_request_detection": {},
"address_protection": {},
"seed_phrase_detection": {},
"rules": {},
"mcp_binary_integrity": {},
"mcp_tool_provenance": {},
"behavioral_baseline": {},
"airlock": {},
"browser_shield": {},
"media_policy": {},
"taint": {},
"redaction": {},
"learn": {},
"learn_lock": {},
}

var forbiddenLicenseFields = map[string]struct{}{
"license_key": {},
"license_file": {},
Expand Down Expand Up @@ -482,6 +537,9 @@ func (b PolicyBundle) Validate() error {
if err := rejectLicenseFields(b.Payload.ConfigYAML); err != nil {
return err
}
if err := rejectDisallowedBundleSections(b.Payload.ConfigYAML); err != nil {
return err
}
for _, rb := range b.Payload.RuleBundles {
if err := rb.Validate(); err != nil {
return err
Expand Down Expand Up @@ -1294,6 +1352,42 @@ func walkRejectLicenseFieldsAt(n *yaml.Node, path string) error {
return nil
}

// rejectDisallowedBundleSections enforces the default-deny
// allowedPolicyBundleSections allowlist over the top-level keys of a policy
// bundle's config_yaml. Any top-level section not in the allowlist is rejected.
// Only the top level is checked: the allowlist governs which config SURFACES a
// bundle may touch, not values within an allowed surface. An empty/blank
// config_yaml is left to the caller's existing non-empty check.
func rejectDisallowedBundleSections(configYAML string) error {
dec := yaml.NewDecoder(bytes.NewReader([]byte(configYAML)))
var doc yaml.Node
if err := dec.Decode(&doc); err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return fmt.Errorf("%w: parse config payload: %w", ErrForbiddenBundleSection, err)
}
if err := rejectExtraYAMLDocuments(dec); err != nil {
return err
}
if len(doc.Content) == 0 {
return nil
}
root := doc.Content[0]
if root.Kind != yaml.MappingNode {
// A non-mapping top-level document carries no config sections; nothing
// to allow or reject here (other validators handle shape).
return nil
}
for i := 0; i+1 < len(root.Content); i += 2 {
key := root.Content[i].Value
if _, ok := allowedPolicyBundleSections[key]; !ok {
return fmt.Errorf("%w: %q", ErrForbiddenBundleSection, key)
}
}
return nil
}

func rejectExtraYAMLDocuments(dec *yaml.Decoder) error {
var extra yaml.Node
err := dec.Decode(&extra)
Expand Down
2 changes: 1 addition & 1 deletion enterprise/conductor/messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func TestRemoteKillMessage_VerifySignaturesThreshold(t *testing.T) {

func testPolicyBundle() PolicyBundle {
payload := PolicyBundlePayload{
ConfigYAML: "mode: strict\nagents:\n claude-code:\n mode: strict\n",
ConfigYAML: "mode: strict\nmcp_tool_policy:\n enabled: true\n",
Comment thread
luckyPipewrench marked this conversation as resolved.
RuleBundles: []RuleBundleRef{{
Name: "official",
Version: "2026.05.23",
Expand Down
244 changes: 244 additions & 0 deletions enterprise/conductor/policysync/poller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
//go:build enterprise

// Licensed under the Elastic License 2.0. See enterprise/LICENSE.

// Package policysync polls a Conductor leader for the latest signed policy
// bundle scoped to this follower and applies it through the apply boundary.
//
// It mirrors the remote-kill poller (enterprise/conductor/emergency): a single
// goroutine ticks on an interval, issues a GET against the leader over the
// shared mTLS client, and hands the decoded message to an applier. The applier
// here is the follower's policy-bundle apply boundary, which verifies the
// bundle signature, enforces monotonic versioning, and triggers a config
// reload. A rejected bundle leaves the running config untouched (fail closed)
// and is retried on the next interval.
package policysync

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"net/url"
"sync"
"time"

"github.com/luckyPipewrench/pipelock/enterprise/conductor"
)

const (
// LatestPolicyBundlePath mirrors controlplane.LatestPolicyBundlePath. It is
// duplicated rather than imported to keep the follower-side poller free of a
// compile-time dependency on the leader-side controlplane package.
LatestPolicyBundlePath = "/api/v1/conductor/policy/latest"

defaultPollInterval = 30 * time.Second
// defaultResponseBytes caps the policy-bundle response body. A bundle wraps
// a config payload (conductor.MaxConfigYAMLBytes = 256 KiB) plus metadata,
// signatures, and rule-bundle refs; 1 MiB leaves generous headroom while
// bounding memory against a hostile or misbehaving leader.
defaultResponseBytes = 1 << 20

pollerUserAgent = "pipelock-conductor-policy-sync/1.0"
pollerAcceptedCT = "application/json"
)

var (
ErrPollerRequired = errors.New("conductor policy sync poller required")
ErrPollResponse = errors.New("invalid conductor policy bundle poll response")
)

// HTTPDoer is the subset of *http.Client the poller needs. The runtime supplies
// the shared mTLS client; tests supply a stub.
type HTTPDoer interface {
Do(*http.Request) (*http.Response, error)
}

// Applier applies a fetched policy bundle and returns an error if the bundle is
// rejected (bad signature, audience mismatch, version regression, reload
// failure). On error the poller leaves the running config untouched and retries
// on the next interval.
type Applier interface {
ApplyPolicyBundle(conductor.PolicyBundle) error
}

// ApplierFunc adapts a plain function to the Applier interface.
type ApplierFunc func(conductor.PolicyBundle) error

func (f ApplierFunc) ApplyPolicyBundle(b conductor.PolicyBundle) error { return f(b) }

type PollerConfig struct {
BaseURL string
Client HTTPDoer
Applier Applier
PollInterval time.Duration
MaxResponseBytes int64
Logger *slog.Logger
}

type Poller struct {
client HTTPDoer
applier Applier
endpoint string
pollInterval time.Duration
maxResponseBytes int64
logger *slog.Logger

// mu guards lastETag. Run drives PollOnce from a single goroutine, but
// PollOnce is exported and may be called concurrently by tests, so the
// validator cache is mutex-guarded to stay race-clean.
mu sync.Mutex
lastETag string
}

func NewPoller(cfg PollerConfig) (*Poller, error) {
if cfg.Client == nil {
return nil, fmt.Errorf("%w: HTTP client", ErrPollerRequired)
}
if cfg.Applier == nil {
return nil, fmt.Errorf("%w: applier", ErrPollerRequired)
}
endpoint, err := policyEndpoint(cfg.BaseURL)
if err != nil {
return nil, err
}
interval := cfg.PollInterval
if interval == 0 {
interval = defaultPollInterval
}
if interval < time.Second {
return nil, fmt.Errorf("conductor policy bundle poll interval must be >= 1s, got %s", interval)
}
maxBytes := cfg.MaxResponseBytes
if maxBytes == 0 {
maxBytes = defaultResponseBytes
}
if maxBytes <= 0 {
return nil, fmt.Errorf("conductor policy bundle max response bytes must be > 0, got %d", maxBytes)
}
return &Poller{
client: cfg.Client,
applier: cfg.Applier,
endpoint: endpoint,
pollInterval: interval,
maxResponseBytes: maxBytes,
logger: cfg.Logger,
}, nil
}

func (p *Poller) Run(ctx context.Context) error {
if p == nil {
return ErrPollerRequired
}
for {
if err := p.PollOnce(ctx); err != nil {
if errors.Is(err, context.Canceled) || (errors.Is(err, context.DeadlineExceeded) && ctx.Err() != nil) {
return err
}
p.logPollError(err)
}
timer := time.NewTimer(p.pollInterval)
select {
case <-ctx.Done():
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
return ctx.Err()
case <-timer.C:
}
}
}

func (p *Poller) PollOnce(ctx context.Context) error {
if p == nil {
return ErrPollerRequired
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, p.endpoint, nil)
if err != nil {
return fmt.Errorf("create conductor policy bundle poll request: %w", err)
}
req.Header.Set("Accept", pollerAcceptedCT)
req.Header.Set("User-Agent", pollerUserAgent)
p.mu.Lock()
etag := p.lastETag
p.mu.Unlock()
if etag != "" {
req.Header.Set("If-None-Match", etag)
}
resp, err := p.client.Do(req)
if err != nil {
return fmt.Errorf("poll conductor policy bundle: %w", err)
}
defer func() { _ = resp.Body.Close() }()
switch resp.StatusCode {
case http.StatusNotModified, http.StatusNoContent:
// 304: follower already holds the latest. 204: no bundle published for
// this follower's scope. Either way there is nothing to apply.
return nil
case http.StatusOK:
default:
return fmt.Errorf("%w: status=%d", ErrPollResponse, resp.StatusCode)
}
body, err := io.ReadAll(io.LimitReader(resp.Body, p.maxResponseBytes+1))
if err != nil {
return fmt.Errorf("read conductor policy bundle response: %w", err)
}
if int64(len(body)) > p.maxResponseBytes {
return fmt.Errorf("%w: body exceeds %d bytes", ErrPollResponse, p.maxResponseBytes)
}
var bundle conductor.PolicyBundle
decoder := json.NewDecoder(bytes.NewReader(body))
decoder.DisallowUnknownFields()
if err := decoder.Decode(&bundle); err != nil {
return fmt.Errorf("%w: decode: %w", ErrPollResponse, err)
}
if err := decoder.Decode(&struct{}{}); !errors.Is(err, io.EOF) {
return fmt.Errorf("%w: trailing JSON document", ErrPollResponse)
}
if err := p.applier.ApplyPolicyBundle(bundle); err != nil {
return fmt.Errorf("apply conductor policy bundle: %w", err)
}
// Advance the cached validator only after a successful apply, so a transient
// apply failure is retried on the next poll rather than being masked by a
// 304 (the leader would otherwise short-circuit the follower's recovery).
p.mu.Lock()
p.lastETag = resp.Header.Get("ETag")
p.mu.Unlock()
return nil
}

func (p *Poller) logPollError(err error) {
if p.logger == nil {
return
}
p.logger.Warn("conductor_policy_bundle_poll_error",
slog.String("event", "conductor_policy_bundle_poll_error"),
slog.String("error", err.Error()),
)
}

func policyEndpoint(rawBaseURL string) (string, error) {
u, err := url.Parse(rawBaseURL)
if err != nil {
return "", fmt.Errorf("parse conductor policy bundle base URL: %w", err)
}
if u.Scheme != "https" || u.Host == "" {
return "", fmt.Errorf("conductor policy bundle base URL must be https with a host")
}
if u.User != nil || u.RawQuery != "" || u.Fragment != "" {
return "", fmt.Errorf("conductor policy bundle base URL must not include userinfo, query, or fragment")
}
if u.Path != "" && u.Path != "/" {
return "", fmt.Errorf("conductor policy bundle base URL must not include a path component")
}
u.Path = LatestPolicyBundlePath
u.RawPath = ""
return u.String(), nil
}
Loading
Loading