-
-
Notifications
You must be signed in to change notification settings - Fork 78
feat(conductor): wire follower policy-bundle poller #640
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
06a6960
feat(conductor): wire follower policy-bundle poller
luckyPipewrench db0691c
test(conductor): cover bundle poller mTLS and poll-interval error paths
luckyPipewrench d8f04d6
fix(license): replace fixed date in CRL roundtrip test with time.Now()
luckyPipewrench File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,244 @@ | ||
| //go:build enterprise | ||
|
|
||
| // Licensed under the Elastic License 2.0. See enterprise/LICENSE. | ||
|
|
||
| // Package policysync polls a Conductor leader for the latest signed policy | ||
| // bundle scoped to this follower and applies it through the apply boundary. | ||
| // | ||
| // It mirrors the remote-kill poller (enterprise/conductor/emergency): a single | ||
| // goroutine ticks on an interval, issues a GET against the leader over the | ||
| // shared mTLS client, and hands the decoded message to an applier. The applier | ||
| // here is the follower's policy-bundle apply boundary, which verifies the | ||
| // bundle signature, enforces monotonic versioning, and triggers a config | ||
| // reload. A rejected bundle leaves the running config untouched (fail closed) | ||
| // and is retried on the next interval. | ||
| package policysync | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "context" | ||
| "encoding/json" | ||
| "errors" | ||
| "fmt" | ||
| "io" | ||
| "log/slog" | ||
| "net/http" | ||
| "net/url" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "github.com/luckyPipewrench/pipelock/enterprise/conductor" | ||
| ) | ||
|
|
||
| const ( | ||
| // LatestPolicyBundlePath mirrors controlplane.LatestPolicyBundlePath. It is | ||
| // duplicated rather than imported to keep the follower-side poller free of a | ||
| // compile-time dependency on the leader-side controlplane package. | ||
| LatestPolicyBundlePath = "/api/v1/conductor/policy/latest" | ||
|
|
||
| defaultPollInterval = 30 * time.Second | ||
| // defaultResponseBytes caps the policy-bundle response body. A bundle wraps | ||
| // a config payload (conductor.MaxConfigYAMLBytes = 256 KiB) plus metadata, | ||
| // signatures, and rule-bundle refs; 1 MiB leaves generous headroom while | ||
| // bounding memory against a hostile or misbehaving leader. | ||
| defaultResponseBytes = 1 << 20 | ||
|
|
||
| pollerUserAgent = "pipelock-conductor-policy-sync/1.0" | ||
| pollerAcceptedCT = "application/json" | ||
| ) | ||
|
|
||
| var ( | ||
| ErrPollerRequired = errors.New("conductor policy sync poller required") | ||
| ErrPollResponse = errors.New("invalid conductor policy bundle poll response") | ||
| ) | ||
|
|
||
| // HTTPDoer is the subset of *http.Client the poller needs. The runtime supplies | ||
| // the shared mTLS client; tests supply a stub. | ||
| type HTTPDoer interface { | ||
| Do(*http.Request) (*http.Response, error) | ||
| } | ||
|
|
||
| // Applier applies a fetched policy bundle and returns an error if the bundle is | ||
| // rejected (bad signature, audience mismatch, version regression, reload | ||
| // failure). On error the poller leaves the running config untouched and retries | ||
| // on the next interval. | ||
| type Applier interface { | ||
| ApplyPolicyBundle(conductor.PolicyBundle) error | ||
| } | ||
|
|
||
| // ApplierFunc adapts a plain function to the Applier interface. | ||
| type ApplierFunc func(conductor.PolicyBundle) error | ||
|
|
||
| func (f ApplierFunc) ApplyPolicyBundle(b conductor.PolicyBundle) error { return f(b) } | ||
|
|
||
| type PollerConfig struct { | ||
| BaseURL string | ||
| Client HTTPDoer | ||
| Applier Applier | ||
| PollInterval time.Duration | ||
| MaxResponseBytes int64 | ||
| Logger *slog.Logger | ||
| } | ||
|
|
||
| type Poller struct { | ||
| client HTTPDoer | ||
| applier Applier | ||
| endpoint string | ||
| pollInterval time.Duration | ||
| maxResponseBytes int64 | ||
| logger *slog.Logger | ||
|
|
||
| // mu guards lastETag. Run drives PollOnce from a single goroutine, but | ||
| // PollOnce is exported and may be called concurrently by tests, so the | ||
| // validator cache is mutex-guarded to stay race-clean. | ||
| mu sync.Mutex | ||
| lastETag string | ||
| } | ||
|
|
||
| func NewPoller(cfg PollerConfig) (*Poller, error) { | ||
| if cfg.Client == nil { | ||
| return nil, fmt.Errorf("%w: HTTP client", ErrPollerRequired) | ||
| } | ||
| if cfg.Applier == nil { | ||
| return nil, fmt.Errorf("%w: applier", ErrPollerRequired) | ||
| } | ||
| endpoint, err := policyEndpoint(cfg.BaseURL) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| interval := cfg.PollInterval | ||
| if interval == 0 { | ||
| interval = defaultPollInterval | ||
| } | ||
| if interval < time.Second { | ||
| return nil, fmt.Errorf("conductor policy bundle poll interval must be >= 1s, got %s", interval) | ||
| } | ||
| maxBytes := cfg.MaxResponseBytes | ||
| if maxBytes == 0 { | ||
| maxBytes = defaultResponseBytes | ||
| } | ||
| if maxBytes <= 0 { | ||
| return nil, fmt.Errorf("conductor policy bundle max response bytes must be > 0, got %d", maxBytes) | ||
| } | ||
| return &Poller{ | ||
| client: cfg.Client, | ||
| applier: cfg.Applier, | ||
| endpoint: endpoint, | ||
| pollInterval: interval, | ||
| maxResponseBytes: maxBytes, | ||
| logger: cfg.Logger, | ||
| }, nil | ||
| } | ||
|
|
||
| func (p *Poller) Run(ctx context.Context) error { | ||
| if p == nil { | ||
| return ErrPollerRequired | ||
| } | ||
| for { | ||
| if err := p.PollOnce(ctx); err != nil { | ||
| if errors.Is(err, context.Canceled) || (errors.Is(err, context.DeadlineExceeded) && ctx.Err() != nil) { | ||
| return err | ||
| } | ||
| p.logPollError(err) | ||
| } | ||
| timer := time.NewTimer(p.pollInterval) | ||
| select { | ||
| case <-ctx.Done(): | ||
| if !timer.Stop() { | ||
| select { | ||
| case <-timer.C: | ||
| default: | ||
| } | ||
| } | ||
| return ctx.Err() | ||
| case <-timer.C: | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func (p *Poller) PollOnce(ctx context.Context) error { | ||
| if p == nil { | ||
| return ErrPollerRequired | ||
| } | ||
| req, err := http.NewRequestWithContext(ctx, http.MethodGet, p.endpoint, nil) | ||
| if err != nil { | ||
| return fmt.Errorf("create conductor policy bundle poll request: %w", err) | ||
| } | ||
| req.Header.Set("Accept", pollerAcceptedCT) | ||
| req.Header.Set("User-Agent", pollerUserAgent) | ||
| p.mu.Lock() | ||
| etag := p.lastETag | ||
| p.mu.Unlock() | ||
| if etag != "" { | ||
| req.Header.Set("If-None-Match", etag) | ||
| } | ||
| resp, err := p.client.Do(req) | ||
| if err != nil { | ||
| return fmt.Errorf("poll conductor policy bundle: %w", err) | ||
| } | ||
| defer func() { _ = resp.Body.Close() }() | ||
| switch resp.StatusCode { | ||
| case http.StatusNotModified, http.StatusNoContent: | ||
| // 304: follower already holds the latest. 204: no bundle published for | ||
| // this follower's scope. Either way there is nothing to apply. | ||
| return nil | ||
| case http.StatusOK: | ||
| default: | ||
| return fmt.Errorf("%w: status=%d", ErrPollResponse, resp.StatusCode) | ||
| } | ||
| body, err := io.ReadAll(io.LimitReader(resp.Body, p.maxResponseBytes+1)) | ||
| if err != nil { | ||
| return fmt.Errorf("read conductor policy bundle response: %w", err) | ||
| } | ||
| if int64(len(body)) > p.maxResponseBytes { | ||
| return fmt.Errorf("%w: body exceeds %d bytes", ErrPollResponse, p.maxResponseBytes) | ||
| } | ||
| var bundle conductor.PolicyBundle | ||
| decoder := json.NewDecoder(bytes.NewReader(body)) | ||
| decoder.DisallowUnknownFields() | ||
| if err := decoder.Decode(&bundle); err != nil { | ||
| return fmt.Errorf("%w: decode: %w", ErrPollResponse, err) | ||
| } | ||
| if err := decoder.Decode(&struct{}{}); !errors.Is(err, io.EOF) { | ||
| return fmt.Errorf("%w: trailing JSON document", ErrPollResponse) | ||
| } | ||
| if err := p.applier.ApplyPolicyBundle(bundle); err != nil { | ||
| return fmt.Errorf("apply conductor policy bundle: %w", err) | ||
| } | ||
| // Advance the cached validator only after a successful apply, so a transient | ||
| // apply failure is retried on the next poll rather than being masked by a | ||
| // 304 (the leader would otherwise short-circuit the follower's recovery). | ||
| p.mu.Lock() | ||
| p.lastETag = resp.Header.Get("ETag") | ||
| p.mu.Unlock() | ||
| return nil | ||
| } | ||
|
|
||
| func (p *Poller) logPollError(err error) { | ||
| if p.logger == nil { | ||
| return | ||
| } | ||
| p.logger.Warn("conductor_policy_bundle_poll_error", | ||
| slog.String("event", "conductor_policy_bundle_poll_error"), | ||
| slog.String("error", err.Error()), | ||
| ) | ||
| } | ||
|
|
||
| func policyEndpoint(rawBaseURL string) (string, error) { | ||
| u, err := url.Parse(rawBaseURL) | ||
| if err != nil { | ||
| return "", fmt.Errorf("parse conductor policy bundle base URL: %w", err) | ||
| } | ||
| if u.Scheme != "https" || u.Host == "" { | ||
| return "", fmt.Errorf("conductor policy bundle base URL must be https with a host") | ||
| } | ||
| if u.User != nil || u.RawQuery != "" || u.Fragment != "" { | ||
| return "", fmt.Errorf("conductor policy bundle base URL must not include userinfo, query, or fragment") | ||
| } | ||
| if u.Path != "" && u.Path != "/" { | ||
| return "", fmt.Errorf("conductor policy bundle base URL must not include a path component") | ||
| } | ||
| u.Path = LatestPolicyBundlePath | ||
| u.RawPath = "" | ||
| return u.String(), nil | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.