From 2ebecc8e42dba5cf9d41124cc1338d63018ef2e0 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Tue, 28 Oct 2025 17:35:51 -0400 Subject: [PATCH 01/11] Be-very defensive when it comes to updating local_metadata. --- internal/pkg/api/handleCheckin.go | 40 ++++---- internal/pkg/api/handleCheckin_test.go | 128 +++++++++++++++++++++++-- internal/pkg/checkin/bulk.go | 29 ++++-- internal/pkg/checkin/bulk_test.go | 54 +++++++++++ 4 files changed, 221 insertions(+), 30 deletions(-) diff --git a/internal/pkg/api/handleCheckin.go b/internal/pkg/api/handleCheckin.go index 6baf1d0b4e..ed55ae65bc 100644 --- a/internal/pkg/api/handleCheckin.go +++ b/internal/pkg/api/handleCheckin.go @@ -1037,30 +1037,36 @@ func parseMeta(zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest) ([] return nil, nil } + // Check for empty string - not valid metadata + if str, ok := reqLocalMeta.(string); ok && str == "" { + zlog.Warn().Msg("local metadata empty; won't update metadata") + return nil, nil + } + // Deserialize the agent's metadata copy var agentLocalMeta interface{} - if err := json.Unmarshal(agent.LocalMetadata, &agentLocalMeta); err != nil { - return nil, fmt.Errorf("parseMeta local: %w", err) + if agent.LocalMetadata != nil { + if err := json.Unmarshal(agent.LocalMetadata, &agentLocalMeta); err != nil { + // if it has metadata in the document it has to be JSON or the mapping is incorrect + return nil, fmt.Errorf("parseMeta local: %w", err) + } } - var outMeta []byte - - // Compare the deserialized meta structures and return the bytes to update if different - if !reflect.DeepEqual(reqLocalMeta, agentLocalMeta) { - - zlog.Trace(). - RawJSON("oldLocalMeta", agent.LocalMetadata). - RawJSON("newLocalMeta", req.LocalMetadata). - Msg("local metadata not equal") + // Compare the deserialized meta structures, already the same means no update needs to occur. + if reflect.DeepEqual(reqLocalMeta, agentLocalMeta) { + return nil, nil + } - zlog.Info(). - RawJSON("req.LocalMeta", req.LocalMetadata). - Msg("applying new local metadata") + zlog.Trace(). + RawJSON("oldLocalMeta", agent.LocalMetadata). + RawJSON("newLocalMeta", req.LocalMetadata). + Msg("local metadata not equal") - outMeta = req.LocalMetadata - } + zlog.Info(). + RawJSON("req.LocalMeta", req.LocalMetadata). + Msg("applying new local metadata") - return outMeta, nil + return req.LocalMetadata, nil } func parseComponents(zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest) ([]byte, *[]string, error) { diff --git a/internal/pkg/api/handleCheckin_test.go b/internal/pkg/api/handleCheckin_test.go index 07b775f78d..7f31be0012 100644 --- a/internal/pkg/api/handleCheckin_test.go +++ b/internal/pkg/api/handleCheckin_test.go @@ -1050,11 +1050,12 @@ func TestValidateCheckinRequest(t *testing.T) { verCon := mustBuildConstraints("8.0.0") tests := []struct { - name string - req *http.Request - cfg *config.Server - expErr error - expValid validatedCheckin + name string + req *http.Request + cfg *config.Server + currentMeta json.RawMessage + expErr error + expValid validatedCheckin }{ { name: "Invalid JSON", @@ -1118,6 +1119,121 @@ func TestValidateCheckinRequest(t *testing.T) { rawMeta: []byte(`{"elastic": {"agent": {"id": "testid", "fips": true}}}`), }, }, + { + name: "local metadata matches", + req: &http.Request{ + Body: io.NopCloser(strings.NewReader(`{"status": "online", "message": "test message", "local_metadata": {"elastic": {"agent": {"id": "testid", "fips": true}}}}`)), + }, + expErr: nil, + currentMeta: json.RawMessage(`{"elastic": {"agent": {"id": "testid", "fips": true}}}`), + cfg: &config.Server{ + Limits: config.ServerLimits{ + CheckinLimit: config.Limit{ + MaxBody: 0, + }, + }, + }, + expValid: validatedCheckin{ + rawMeta: nil, // no need to update + }, + }, + { + name: "local metadata different JSON formatting", + req: &http.Request{ + // JSON with specific key ordering + Body: io.NopCloser(strings.NewReader(`{"status": "online", "message": "test message", "local_metadata": {"elastic": {"agent": {"id": "testid", "version": "8.0.0"}}, "host": {"hostname": "test-host"}}}`)), + }, + expErr: nil, + // Same content but different key ordering in JSON - when unmarshaled and compared + // with reflect.DeepEqual they should be equal, but raw bytes are different + currentMeta: json.RawMessage(`{"host":{"hostname":"test-host"},"elastic":{"agent":{"version":"8.0.0","id":"testid"}}}`), + cfg: &config.Server{ + Limits: config.ServerLimits{ + CheckinLimit: config.Limit{ + MaxBody: 0, + }, + }, + }, + expValid: validatedCheckin{ + rawMeta: nil, // should recognize as same content despite different formatting + }, + }, + { + name: "local metadata is empty string and agent has nil", + req: &http.Request{ + Body: io.NopCloser(strings.NewReader(`{"status": "online", "message": "test message", "local_metadata": ""}`)), + }, + expErr: nil, + currentMeta: nil, + cfg: &config.Server{ + Limits: config.ServerLimits{ + CheckinLimit: config.Limit{ + MaxBody: 0, + }, + }, + }, + expValid: validatedCheckin{ + // don't update metadata + rawMeta: nil, + }, + }, + { + name: "local metadata is empty string and agent has different value", + req: &http.Request{ + Body: io.NopCloser(strings.NewReader(`{"status": "online", "message": "test message", "local_metadata": ""}`)), + }, + expErr: nil, + currentMeta: json.RawMessage(`{"host": {"hostname": "test-host"}}`), + cfg: &config.Server{ + Limits: config.ServerLimits{ + CheckinLimit: config.Limit{ + MaxBody: 0, + }, + }, + }, + expValid: validatedCheckin{ + // don't update metadata + rawMeta: nil, + }, + }, + { + name: "local metadata is null and agent has nil", + req: &http.Request{ + Body: io.NopCloser(strings.NewReader(`{"status": "online", "message": "test message", "local_metadata": null}`)), + }, + expErr: nil, + currentMeta: nil, + cfg: &config.Server{ + Limits: config.ServerLimits{ + CheckinLimit: config.Limit{ + MaxBody: 0, + }, + }, + }, + expValid: validatedCheckin{ + // don't update metadata + rawMeta: nil, + }, + }, + { + name: "local metadata is null and agent has existing metadata", + req: &http.Request{ + Body: io.NopCloser(strings.NewReader(`{"status": "online", "message": "test message", "local_metadata": null}`)), + }, + expErr: nil, + currentMeta: json.RawMessage(`{"host": {"hostname": "test-host"}}`), + cfg: &config.Server{ + Limits: config.ServerLimits{ + CheckinLimit: config.Limit{ + MaxBody: 0, + }, + }, + }, + expValid: validatedCheckin{ + // don't update metadata + rawMeta: nil, + }, + }, } for _, tc := range tests { @@ -1126,7 +1242,7 @@ func TestValidateCheckinRequest(t *testing.T) { assert.NoError(t, err) wr := httptest.NewRecorder() logger := testlog.SetLogger(t) - valid, err := checkin.validateRequest(logger, wr, tc.req, time.Time{}, &model.Agent{LocalMetadata: json.RawMessage(`{}`)}) + valid, err := checkin.validateRequest(logger, wr, tc.req, time.Time{}, &model.Agent{LocalMetadata: tc.currentMeta}) if tc.expErr == nil { assert.NoError(t, err) assert.Equal(t, tc.expValid.rawMeta, valid.rawMeta) diff --git a/internal/pkg/checkin/bulk.go b/internal/pkg/checkin/bulk.go index b68692ab8b..8892c94499 100644 --- a/internal/pkg/checkin/bulk.go +++ b/internal/pkg/checkin/bulk.go @@ -64,6 +64,10 @@ func WithUnhealthyReason(reason *[]string) Option { func WithMeta(meta []byte) Option { return func(pending *pendingT) { + if len(meta) == 0 { + // no real meta; do nothing + return + } if pending.extra == nil { pending.extra = &extraT{} } @@ -94,6 +98,10 @@ func WithVer(ver string) Option { func WithComponents(components []byte) Option { return func(pending *pendingT) { + if len(components) == 0 { + // no real components; do nothing + return + } if pending.extra == nil { pending.extra = &extraT{} } @@ -198,8 +206,16 @@ func (bc *Bulk) timestamp() string { // WARNING: Bulk will take ownership of fields, so do not use after passing in. func (bc *Bulk) CheckIn(id string, opts ...Option) error { bc.mut.Lock() - pending := pendingT{ - ts: bc.timestamp(), + defer bc.mut.Unlock() + + // possible there is already a check-in queued for the same ID + // if that is present we need to be sure to use that pending + // instead of creating a new one + pending, ok := bc.pending[id] + if !ok { + pending = pendingT{ + ts: bc.timestamp(), + } } for _, opt := range opts { @@ -207,7 +223,6 @@ func (bc *Bulk) CheckIn(id string, opts ...Option) error { } bc.pending[id] = pending - bc.mut.Unlock() return nil } @@ -341,16 +356,16 @@ func toUpdateBody(now string, pending pendingT) ([]byte, error) { } } - // Update local metadata if provided - if pending.extra.meta != nil { + // Update local metadata if provided (and has a value) + if len(pending.extra.meta) > 0 { // Surprise: The json encoder compacts this raw JSON during // the encode process, so there my be unexpected memory overhead: // https://github.com/golang/go/blob/de5d7eccb99088e3ab42c0d907da6852d8f9cebe/src/encoding/json/encode.go#L503-L507 fields[dl.FieldLocalMetadata] = json.RawMessage(pending.extra.meta) } - // Update components if provided - if pending.extra.components != nil { + // Update components if provided (and has a value) + if len(pending.extra.components) > 0 { fields[dl.FieldComponents] = json.RawMessage(pending.extra.components) } diff --git a/internal/pkg/checkin/bulk_test.go b/internal/pkg/checkin/bulk_test.go index d0cb344d54..4e325707b6 100644 --- a/internal/pkg/checkin/bulk_test.go +++ b/internal/pkg/checkin/bulk_test.go @@ -201,6 +201,60 @@ func TestBulkSimple(t *testing.T) { } } +func TestBulkReusePending(t *testing.T) { + ctx := testlog.SetLogger(t).WithContext(t.Context()) + + const ( + agentID = "test-agent-id" + status = "online" + message = "test message" + ) + + meta := []byte(`{"test":"metadata"}`) + + // Matcher that validates both the existing field (status) and new field (meta) are present + matchAccumulatedOps := func(ops []bulk.MultiOp) bool { + if len(ops) != 1 { + t.Errorf("Expected 1 operation, got %d", len(ops)) + return false + } + if ops[0].ID != agentID { + t.Errorf("Expected ID %s, got %s", agentID, ops[0].ID) + return false + } + + type updateT struct { + Status string `json:"last_checkin_status"` + Meta json.RawMessage `json:"local_metadata"` + } + + m := make(map[string]updateT) + err := json.Unmarshal(ops[0].Body, &m) + require.NoErrorf(t, err, "unable to validate operation body %s", string(ops[0].Body)) + + sub, ok := m["doc"] + require.True(t, ok, "unable to validate operation: expected doc") + + assert.Equal(t, status, sub.Status, "Expected status from first CheckIn to be preserved") + assert.Equal(t, json.RawMessage(meta), sub.Meta, "Expected metadata from second CheckIn to be added") + return true + } + + mockBulk := ftesting.NewMockBulk() + mockBulk.On("MUpdate", mock.Anything, mock.MatchedBy(matchAccumulatedOps), mock.Anything).Return([]bulk.BulkIndexerResponseItem{}, nil).Once() + + bc := NewBulk(mockBulk) + + err := bc.CheckIn(agentID, WithStatus(status), WithMessage(message)) + require.NoError(t, err) + err = bc.CheckIn(agentID, WithMeta(meta)) + require.NoError(t, err) + err = bc.flush(ctx) + require.NoError(t, err) + + mockBulk.AssertExpectations(t) +} + func validateTimestamp(tb testing.TB, start time.Time, ts string) { t1, err := time.Parse(time.RFC3339, ts) require.NoErrorf(tb, err, "expected %q to be in RFC 3339 format", ts) From 96fa69b3fcb9da3788b496205b282b4365978ee8 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Tue, 28 Oct 2025 17:41:13 -0400 Subject: [PATCH 02/11] Add changelog entry. --- ...kin-local_metadata-from-being-updated.yaml | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 changelog/fragments/1761687609-fix-issue-prevent-checkin-local_metadata-from-being-updated.yaml diff --git a/changelog/fragments/1761687609-fix-issue-prevent-checkin-local_metadata-from-being-updated.yaml b/changelog/fragments/1761687609-fix-issue-prevent-checkin-local_metadata-from-being-updated.yaml new file mode 100644 index 0000000000..87af0d21fc --- /dev/null +++ b/changelog/fragments/1761687609-fix-issue-prevent-checkin-local_metadata-from-being-updated.yaml @@ -0,0 +1,45 @@ +# REQUIRED +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# REQUIRED for all kinds +# Change summary; a 80ish characters long description of the change. +summary: fix issue prevent checkin local_metadata from being updated + +# REQUIRED for breaking-change, deprecation, known-issue +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# description: + +# REQUIRED for breaking-change, deprecation, known-issue +# impact: + +# REQUIRED for breaking-change, deprecation, known-issue +# action: + +# REQUIRED for all kinds +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: + +# AUTOMATED +# OPTIONAL to manually add other PR URLs +# PR URL: A link the PR that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +# pr: https://github.com/owner/repo/1234 + +# AUTOMATED +# OPTIONAL to manually add other issue URLs +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +# issue: https://github.com/owner/repo/1234 From 114e494d9083611c6a63fcbff34b26fd4608455f Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Wed, 29 Oct 2025 15:48:42 -0400 Subject: [PATCH 03/11] Bump the EventuallyWithT to 20 seconds in E2E tests. This is because the checkin bulker has a default timeout of 10 seconds, meaning the original 10 seconds could result in it being missed by the check. --- testing/e2e/api_version/client_api_current.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/testing/e2e/api_version/client_api_current.go b/testing/e2e/api_version/client_api_current.go index 7ed80cccdf..0e7afeec89 100644 --- a/testing/e2e/api_version/client_api_current.go +++ b/testing/e2e/api_version/client_api_current.go @@ -576,7 +576,7 @@ func (tester *ClientAPITester) TestCheckinWithPolicyIDRevision() { agent := tester.GetAgent(ctx, agentID) assert.Equal(c, policyID, agent.AgentPolicyID) assert.Equal(c, revIDX, int64(agent.Revision)) - }, time.Second*10, time.Second) + }, time.Second*20, time.Second) // Check in with revIDX that does not exist // POLICY_CHANGE should be returned @@ -612,7 +612,7 @@ func (tester *ClientAPITester) TestCheckinWithPolicyIDRevision() { agent := tester.GetAgent(ctx, agentID) assert.Equal(c, policyID, agent.AgentPolicyID) assert.Equal(c, newRevIDX, int64(agent.Revision)) - }, time.Second*10, time.Second) + }, time.Second*20, time.Second) // Update policy // Get the policy then "update" it without changing anything - revision ID should increment @@ -682,7 +682,7 @@ func (tester *ClientAPITester) TestCheckinWithPolicyIDRevision() { agent := tester.GetAgent(ctx, agentID) require.Equal(c, policyID, agent.AgentPolicyID) require.Equal(c, revIDX, int64(agent.Revision)) - }, time.Second*10, time.Second) + }, time.Second*20, time.Second) // Do a normal checkin to "reset" to latest revision_idx // no actions are returned @@ -704,7 +704,7 @@ func (tester *ClientAPITester) TestCheckinWithPolicyIDRevision() { agent := tester.GetAgent(ctx, agentID) require.Equal(c, policyID, agent.AgentPolicyID) require.Equal(c, revIDX, int64(agent.Revision)) - }, time.Second*10, time.Second) + }, time.Second*20, time.Second) // Test that if the agent is "restored" to an earlier revIDX a policy_change is sent prevRev := revIDX - 1 @@ -725,7 +725,7 @@ func (tester *ClientAPITester) TestCheckinWithPolicyIDRevision() { agent := tester.GetAgent(ctx, agentID) require.Equal(c, policyID, agent.AgentPolicyID) require.Equal(c, prevRev, int64(agent.Revision)) - }, time.Second*10, time.Second) + }, time.Second*20, time.Second) // agent is now recorded as on a previous revision - check to make sure a checkin without AgentPolicyId and revision result in a POLICY_CHANGE action tester.T().Logf("test checkin 7: agent %s with no policy or revision", agentID) @@ -765,7 +765,7 @@ func (tester *ClientAPITester) TestCheckinWithPolicyIDRevision() { agent := tester.GetAgent(ctx, agentID) assert.Equal(c, policyID, agent.AgentPolicyID) assert.Equal(c, revIDX, int64(agent.Revision)) - }, time.Second*10, time.Second) + }, time.Second*20, time.Second) // sanity check agent status in kibana tester.AgentIsOnline(ctx, agentID) From f0fcf4a43ce8f587be0712394f9bb964f7fe36b9 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Mon, 3 Nov 2025 17:43:58 -0500 Subject: [PATCH 04/11] Improve checkin. --- internal/pkg/api/handleCheckin.go | 223 ++++---- internal/pkg/api/handleCheckin_test.go | 7 +- internal/pkg/checkin/bulk.go | 477 ++++++++---------- internal/pkg/checkin/bulk_test.go | 106 +--- .../deleteAuditFieldsOnCheckin.painless | 21 - internal/pkg/policy/sub.go | 2 +- internal/pkg/server/fleet.go | 2 +- internal/pkg/server/fleet_integration_test.go | 9 +- 8 files changed, 321 insertions(+), 526 deletions(-) diff --git a/internal/pkg/api/handleCheckin.go b/internal/pkg/api/handleCheckin.go index ed55ae65bc..78098e6b33 100644 --- a/internal/pkg/api/handleCheckin.go +++ b/internal/pkg/api/handleCheckin.go @@ -277,13 +277,31 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r // Handle upgrade details for agents using the new 8.11 upgrade details field of the checkin. // Older agents will communicate any issues with upgrades via the Ack endpoint. - if err := ct.processUpgradeDetails(r.Context(), agent, req.UpgradeDetails); err != nil { + upgradeCheckinOpts, err := ct.processUpgradeDetails(r.Context(), agent, req.UpgradeDetails) + if err != nil { return fmt.Errorf("failed to update upgrade_details: %w", err) } - initialOpts := []checkin.Option{ + // Handle the policy ID and the revision idx upon check-in. We want to always ensure that + // the agent document represents the current state of this check-in. + policyID := agent.PolicyID + revIdx := agent.PolicyRevisionIdx + if !ct.cfg.Features.IgnoreCheckinPolicyID && req.AgentPolicyId != nil && req.PolicyRevisionIdx != nil { + policyID = *req.AgentPolicyId + revIdx = *req.PolicyRevisionIdx + zlog.Debug(). + Str("policy_id", agent.PolicyID). + Str("checkin_policy_id", policyID). + Int64("revision_idx", agent.PolicyRevisionIdx). + Int64("checkin_revision_idx", revIdx). + Msg("checkin included policy_id and revision_idx") + } + + checkinOpts := []checkin.Option{ checkin.WithStatus(string(req.Status)), checkin.WithMessage(req.Message), + checkin.WithAgentPolicyID(policyID), + checkin.WithPolicyRevisionIDX(revIdx), checkin.WithMeta(rawMeta), checkin.WithComponents(rawComponents), checkin.WithSeqNo(seqno), @@ -291,45 +309,65 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r checkin.WithUnhealthyReason(unhealthyReason), checkin.WithDeleteAudit(agent.AuditUnenrolledReason != "" || agent.UnenrolledAt != ""), } - - revID, opts, err := ct.processPolicyDetails(r.Context(), zlog, agent, req) - if err != nil { - return fmt.Errorf("failed to update policy details: %w", err) - } - if len(opts) > 0 { - initialOpts = append(initialOpts, opts...) + if upgradeCheckinOpts != nil { + checkinOpts = append(checkinOpts, upgradeCheckinOpts...) } - // Subscribe to actions dispatcher - aSub := ct.ad.Subscribe(zlog, agent.Id, seqno) - defer ct.ad.Unsubscribe(zlog, aSub) - actCh := aSub.Ch() - + // determine the effective policy ID and revision idx to use for the policy monitor. + // this can be different based on the situations to force a POLICY_CHANGE. + effectivePolicyID := policyID + effectiveRevIdx := revIdx for _, output := range agent.Outputs { if output.APIKey == "" { // use revision_idx=0 if the agent has a single output where no API key is defined // This will force the policy monitor to emit a new policy to regerate API keys - revID = 0 + effectiveRevIdx = 0 break } } + if effectivePolicyID != agent.PolicyID || effectiveRevIdx != agent.PolicyRevisionIdx { + // the agent is either on the wrong policy or the wrong revision of that policy + zlog.Info(). + Str("policy_id", agent.PolicyID). + Str("effective_policy_id", effectivePolicyID). + Int64("revision_idx", agent.PolicyRevisionIdx). + Int64("effective_revision_idx", effectiveRevIdx). + Msg("effective policy is different; resulting in a POLICY_CHANGE") + + // skipped when ignore checkin is enabled, because this never used to occur until this started + // using the policy information in the checkin body + if !ct.cfg.Features.IgnoreCheckinPolicyID { + // this invalidates any of the API keys + for outputName, output := range agent.Outputs { + if output.Type != policy.OutputTypeElasticsearch { + continue + } + err = updateAPIKey(r.Context(), zlog, ct.bulker, agent.Id, output.APIKeyID, output.PermissionsHash, output.ToRetireAPIKeyIds, outputName) + if err != nil { + // Only returns ErrUpdatingInactiveAgent + return fmt.Errorf("failed to update policy details: %w", err) + } + } + } + } + + // Subscribe to actions dispatcher + aSub := ct.ad.Subscribe(zlog, agent.Id, seqno) + defer ct.ad.Unsubscribe(zlog, aSub) + actCh := aSub.Ch() // Subscribe to policy manager for changes on PolicyId > policyRev - sub, err := ct.pm.Subscribe(agent.Id, agent.PolicyID, revID) + sub, err := ct.pm.Subscribe(agent.Id, effectivePolicyID, effectiveRevIdx) if err != nil { return fmt.Errorf("subscribe policy monitor: %w", err) } defer func() { err := ct.pm.Unsubscribe(sub) if err != nil { - zlog.Error().Err(err).Str(ecs.PolicyID, agent.PolicyID).Msg("unable to unsubscribe from policy") + zlog.Error().Err(err).Str(ecs.PolicyID, effectivePolicyID).Msg("unable to unsubscribe from policy") } }() - // Update check-in timestamp on timeout - tick := time.NewTicker(ct.cfg.Timeouts.CheckinTimestamp) - defer tick.Stop() - setupDuration := time.Since(start) pollDuration, jitter := calcPollDuration(zlog, pollDuration, setupDuration, ct.cfg.Timeouts.CheckinJitter) @@ -345,14 +383,18 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r longPoll := time.NewTicker(pollDuration) defer longPoll.Stop() - // Initial update on checkin, and any user fields that might have changed - // Run a script to remove audit_unenrolled_* and unenrolled_at attributes if one is set on checkin. - // 8.16.x releases would incorrectly set unenrolled_at - err = ct.bc.CheckIn(agent.Id, initialOpts...) + // Initial update on checkin updates any user fields that might have changed. + err = ct.bc.CheckIn(r.Context(), agent.Id, checkinOpts...) if err != nil { zlog.Error().Err(err).Str(ecs.AgentID, agent.Id).Msg("checkin failed") + return err } + // Register the agent with the checkin to keep the `updated_at` field updated while connected. + // The defer will ensure that on return or panic that it is removed. + ct.bc.Add(agent.Id) + defer ct.bc.Remove(agent.Id) + // Initial fetch for pending actions var ( actions []Action @@ -402,11 +444,6 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r case <-longPoll.C: zlog.Trace().Msg("fire long poll") break LOOP - case <-tick.C: - err := ct.bc.CheckIn(agent.Id, checkin.WithStatus(string(req.Status)), checkin.WithMessage(req.Message), checkin.WithComponents(rawComponents), checkin.WithVer(ver), checkin.WithUnhealthyReason(unhealthyReason)) // FIXME If we change to properly handle empty strings we could stop passing optional args here. - if err != nil { - zlog.Error().Err(err).Str(ecs.AgentID, agent.Id).Msg("checkin failed") - } } } } @@ -445,23 +482,26 @@ func (ct *CheckinT) verifyActionExists(vCtx context.Context, vSpan *apm.Span, ag // if the agent doc and checkin details are both nil the method is a nop // if the checkin upgrade_details is nil but there was a previous value in the agent doc, fleet-server treats it as a successful upgrade // otherwise the details are validated; action_id is checked and upgrade_details.metadata is validated based on upgrade_details.state and the agent doc is updated. -func (ct *CheckinT) processUpgradeDetails(ctx context.Context, agent *model.Agent, details *UpgradeDetails) error { +func (ct *CheckinT) processUpgradeDetails(ctx context.Context, agent *model.Agent, details *UpgradeDetails) ([]checkin.Option, error) { if details == nil { - err := ct.markUpgradeComplete(ctx, agent) - if err != nil { - return err + if agent.UpgradeDetails == nil { + return nil, nil } - return nil + return []checkin.Option{ + checkin.WithUpgradeDetails(nil), + checkin.WithUpgradeStartedAt(nil), + checkin.WithUpgradeStatus(nil), + }, nil } - // update docs with in progress details + // update docs with in progress details vSpan, vCtx := apm.StartSpan(ctx, "Check update action", "validate") action, err := ct.verifyActionExists(ctx, vSpan, agent, details) if err != nil { - return err + return nil, err } if action == nil { - return nil + return nil, nil } // link action with APM spans @@ -495,84 +535,61 @@ func (ct *CheckinT) processUpgradeDetails(ctx context.Context, agent *model.Agen upgradeDetails, err := details.Metadata.AsUpgradeMetadataDownloading() if err != nil { vSpan.End() - return fmt.Errorf("%w %s: %w", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGDOWNLOADING, err) + return nil, fmt.Errorf("%w %s: %w", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGDOWNLOADING, err) } if err := details.Metadata.FromUpgradeMetadataDownloading(upgradeDetails); err != nil { vSpan.End() - return fmt.Errorf("%w %s: unable to repack metadata: %w", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGDOWNLOADING, err) + return nil, fmt.Errorf("%w %s: unable to repack metadata: %w", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGDOWNLOADING, err) } case UpgradeDetailsStateUPGFAILED: if details.Metadata == nil { vSpan.End() - return fmt.Errorf("%w: metadata missing", ErrInvalidUpgradeMetadata) + return nil, fmt.Errorf("%w: metadata missing", ErrInvalidUpgradeMetadata) } meta, err := details.Metadata.AsUpgradeMetadataFailed() if err != nil { vSpan.End() - return fmt.Errorf("%w %s: %w", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGFAILED, err) + return nil, fmt.Errorf("%w %s: %w", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGFAILED, err) } if meta.ErrorMsg == "" { vSpan.End() - return fmt.Errorf("%w: %s metadata contains empty error_msg attribute", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGFAILED) + return nil, fmt.Errorf("%w: %s metadata contains empty error_msg attribute", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGFAILED) } // Repack metadata in failed case as the agent may send UPG_DOWNLOADING attributes. if err = details.Metadata.FromUpgradeMetadataFailed(meta); err != nil { vSpan.End() - return fmt.Errorf("%w %s: unable to repack metadata: %w", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGFAILED, err) + return nil, fmt.Errorf("%w %s: unable to repack metadata: %w", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGFAILED, err) } case UpgradeDetailsStateUPGSCHEDULED: if details.Metadata == nil { vSpan.End() - return fmt.Errorf("%w: metadata missing", ErrInvalidUpgradeMetadata) + return nil, fmt.Errorf("%w: metadata missing", ErrInvalidUpgradeMetadata) } meta, err := details.Metadata.AsUpgradeMetadataScheduled() if err != nil { vSpan.End() - return fmt.Errorf("%w %s: %w", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGSCHEDULED, err) + return nil, fmt.Errorf("%w %s: %w", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGSCHEDULED, err) } if meta.ScheduledAt.IsZero() { vSpan.End() - return fmt.Errorf("%w: %s metadata contains empty scheduled_at attribute", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGSCHEDULED) + return nil, fmt.Errorf("%w: %s metadata contains empty scheduled_at attribute", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGSCHEDULED) } default: } vSpan.End() - doc := bulk.UpdateFields{ - dl.FieldUpgradeDetails: details, - } - if agent.UpgradeAttempts != nil && details.State == UpgradeDetailsStateUPGWATCHING { - doc[dl.FieldUpgradeAttempts] = nil - } - - body, err := doc.Marshal() + detailsBody, err := json.Marshal(details) if err != nil { - return err + return nil, fmt.Errorf("failed to marshal upgrade details: %w", err) } - return ct.bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)) -} - -func (ct *CheckinT) markUpgradeComplete(ctx context.Context, agent *model.Agent) error { - // nop if there are no checkin details, and the agent has no details - if agent.UpgradeDetails == nil { - return nil + opts := []checkin.Option{ + checkin.WithUpgradeDetails(&detailsBody), } - span, ctx := apm.StartSpan(ctx, "Mark update complete", "update") - span.Context.SetLabel("agent_id", agent.Agent.ID) - defer span.End() - // if the checkin had no details, but agent has details treat like a successful upgrade - doc := bulk.UpdateFields{ - dl.FieldUpgradeDetails: nil, - dl.FieldUpgradeStartedAt: nil, - dl.FieldUpgradeStatus: nil, - dl.FieldUpgradedAt: time.Now().UTC().Format(time.RFC3339), - } - body, err := doc.Marshal() - if err != nil { - return err + if agent.UpgradeAttempts != nil && details.State == UpgradeDetailsStateUPGWATCHING { + opts = append(opts, checkin.WithUpgradeAttempts(nil)) } - return ct.bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)) + return opts, nil } func (ct *CheckinT) writeResponse(zlog zerolog.Logger, w http.ResponseWriter, r *http.Request, agent *model.Agent, resp CheckinResponse) error { @@ -1198,55 +1215,3 @@ func calcPollDuration(zlog zerolog.Logger, pollDuration, setupDuration, jitterDu return pollDuration, jitter } - -// processPolicyDetails handles the agent_policy_id and revision_idx included in the checkin request. -// The API keys will be managed if the agent reports a new policy id from its last checkin, or if the revision is different than what the last checkin reported. -// It returns the revision idx that should be used when subscribing for new POLICY_CHANGE actons and optional args to use when doing the non-tick checkin. -func (ct *CheckinT) processPolicyDetails(ctx context.Context, zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest) (int64, []checkin.Option, error) { - // no details specified or attributes are ignored by config - if ct.cfg.Features.IgnoreCheckinPolicyID || req == nil || req.PolicyRevisionIdx == nil || req.AgentPolicyId == nil { - return agent.PolicyRevisionIdx, nil, nil - } - policyID := *req.AgentPolicyId - revisionIDX := *req.PolicyRevisionIdx - - span, ctx := apm.StartSpan(ctx, "Process policy details", "process") - span.Context.SetLabel("agent_id", agent.Agent.ID) - span.Context.SetLabel(dl.FieldAgentPolicyID, policyID) - span.Context.SetLabel(dl.FieldPolicyRevisionIdx, revisionIDX) - defer span.End() - - // update agent doc if policy id or revision idx does not match - var opts []checkin.Option - if policyID != agent.AgentPolicyID || revisionIDX != agent.PolicyRevisionIdx { - opts = []checkin.Option{ - checkin.WithAgentPolicyID(policyID), - checkin.WithPolicyRevisionIDX(revisionIDX), - } - } - // Policy reassign, subscribe to policy with revision 0 - if policyID != agent.PolicyID { - zlog.Debug().Str(dl.FieldAgentPolicyID, policyID).Str("new_policy_id", agent.PolicyID).Msg("Policy ID mismatch detected, reassigning agent.") - return 0, opts, nil - } - - // Check if the checkin revision_idx is greater than the latest available - latestRev := ct.pm.LatestRev(ctx, agent.PolicyID) - if latestRev != 0 && revisionIDX > latestRev { - revisionIDX = 0 // set return val to 0 so the agent gets latest available revision. - } - - // Update API keys if the policy has changed, or if the revision differs. - if policyID != agent.AgentPolicyID || revisionIDX != agent.PolicyRevisionIdx { - for outputName, output := range agent.Outputs { - if output.Type != policy.OutputTypeElasticsearch { - continue - } - if err := updateAPIKey(ctx, zlog, ct.bulker, agent.Id, output.APIKeyID, output.PermissionsHash, output.ToRetireAPIKeyIds, outputName); err != nil { - // Only returns ErrUpdatingInactiveAgent - return 0, nil, err - } - } - } - return revisionIDX, opts, nil -} diff --git a/internal/pkg/api/handleCheckin_test.go b/internal/pkg/api/handleCheckin_test.go index 7f31be0012..c52a324052 100644 --- a/internal/pkg/api/handleCheckin_test.go +++ b/internal/pkg/api/handleCheckin_test.go @@ -391,16 +391,13 @@ func TestProcessUpgradeDetails(t *testing.T) { name string agent *model.Agent details *UpgradeDetails - bulk func() *ftesting.MockBulk + opts []checkin.Option cache func() *testcache.MockCache err error }{{ name: "agent and checkin details are nil", agent: &model.Agent{ESDocument: esd}, details: nil, - bulk: func() *ftesting.MockBulk { - return ftesting.NewMockBulk() - }, cache: func() *testcache.MockCache { return testcache.NewMockCache() }, @@ -765,7 +762,7 @@ func TestProcessUpgradeDetails(t *testing.T) { bulker: mBulk, } - err := ct.processUpgradeDetails(context.Background(), tc.agent, tc.details) + opts, err := ct.processUpgradeDetails(context.Background(), tc.agent, tc.details) if tc.err == nil { assert.NoError(t, err) } else { diff --git a/internal/pkg/checkin/bulk.go b/internal/pkg/checkin/bulk.go index 8892c94499..9493e23a9e 100644 --- a/internal/pkg/checkin/bulk.go +++ b/internal/pkg/checkin/bulk.go @@ -9,7 +9,6 @@ import ( "context" _ "embed" "encoding/json" - "errors" "fmt" "sync" "time" @@ -42,136 +41,157 @@ func WithFlushInterval(d time.Duration) Opt { } // Option is the type for optional arguments for agent checkins. -type Option func(*pendingT) +type Option func(*checkinT) func WithStatus(status string) Option { - return func(pending *pendingT) { - pending.status = status + return func(state *checkinT) { + state.status.isSet = true + state.status.value = status } } func WithMessage(message string) Option { - return func(pending *pendingT) { - pending.message = message + return func(state *checkinT) { + state.message.isSet = true + state.message.value = message } } func WithUnhealthyReason(reason *[]string) Option { - return func(pending *pendingT) { - pending.unhealthyReason = reason + return func(state *checkinT) { + state.unhealthyReason = reason } } func WithMeta(meta []byte) Option { - return func(pending *pendingT) { + return func(state *checkinT) { if len(meta) == 0 { // no real meta; do nothing return } - if pending.extra == nil { - pending.extra = &extraT{} - } - pending.extra.meta = meta + state.meta = meta } } func WithSeqNo(seqno sqn.SeqNo) Option { - return func(pending *pendingT) { + return func(state *checkinT) { if !seqno.IsSet() { return } - if pending.extra == nil { - pending.extra = &extraT{} - } - pending.extra.seqNo = seqno + state.seqNo = seqno } } func WithVer(ver string) Option { - return func(pending *pendingT) { - if pending.extra == nil { - pending.extra = &extraT{} - } - pending.extra.ver = ver + return func(state *checkinT) { + state.ver = ver } } func WithComponents(components []byte) Option { - return func(pending *pendingT) { + return func(state *checkinT) { if len(components) == 0 { // no real components; do nothing return } - if pending.extra == nil { - pending.extra = &extraT{} - } - pending.extra.components = components + state.components = components } } func WithDeleteAudit(del bool) Option { - return func(pending *pendingT) { + return func(state *checkinT) { if !del { return } - if pending.extra == nil { - pending.extra = &extraT{} - } - pending.extra.deleteAudit = del + state.deleteAudit = del } } func WithAgentPolicyID(id string) Option { - return func(pending *pendingT) { - pending.agentPolicyID = id + return func(state *checkinT) { + state.agentPolicyID.isSet = true + state.agentPolicyID.value = id } } func WithPolicyRevisionIDX(idx int64) Option { - return func(pending *pendingT) { - pending.revisionIDX = idx + return func(state *checkinT) { + state.revisionIDX.isSet = true + state.revisionIDX.value = idx } } -type extraT struct { - meta []byte - seqNo sqn.SeqNo - ver string - components []byte - deleteAudit bool +func WithUpgradeDetails(details *[]byte) Option { + return func(state *checkinT) { + state.upgradeDetails.isSet = true + state.upgradeDetails.value = details + } +} + +func WithUpgradeStartedAt(startedAt *string) Option { + return func(state *checkinT) { + state.upgradeStartedAt.isSet = true + state.upgradeStartedAt.value = startedAt + } } -// Minimize the size of this structure. -// There will be 10's of thousands of items -// in the map at any point. -type pendingT struct { - ts string - status string - message string - agentPolicyID string // may be empty - revisionIDX int64 - extra *extraT +func WithUpgradeStatus(status *string) Option { + return func(state *checkinT) { + state.upgradeStatus.isSet = true + state.upgradeStatus.value = status + } +} + +func WithUpgradeAttempts(attempts *[]string) Option { + return func(state *checkinT) { + state.upgradeAttempts.isSet = true + state.upgradeAttempts.value = attempts + } +} + +type setT[T any] struct { + isSet bool + value T +} + +type checkinT struct { + ts string + seqNo sqn.SeqNo + + ver string + status setT[string] + message setT[string] unhealthyReason *[]string + + agentPolicyID setT[string] + revisionIDX setT[int64] + + meta []byte + components []byte + + upgradeDetails setT[*[]byte] + upgradeStartedAt setT[*string] + upgradeStatus setT[*string] + upgradeAttempts setT[*[]string] + + deleteAudit bool } -// Bulk will batch pending checkins and update elasticsearch at a set interval. +// Bulk handles checkins and keeps connected agents updated_at timestamp up-to-date. type Bulk struct { - opts optionsT - bulker bulk.Bulk - mut sync.Mutex - pending map[string]pendingT - - ts string - unix int64 + opts optionsT + bulker bulk.Bulk + mut sync.RWMutex + connected map[string]struct{} } func NewBulk(bulker bulk.Bulk, opts ...Opt) *Bulk { parsedOpts := parseOpts(opts...) return &Bulk{ - opts: parsedOpts, - bulker: bulker, - pending: make(map[string]pendingT), + opts: parsedOpts, + bulker: bulker, + connected: make(map[string]struct{}), } } @@ -187,42 +207,68 @@ func parseOpts(opts ...Opt) optionsT { return outOpts } -// Generate and cache timestamp on seconds change. -// Avoid thousands of formats of an identical string. -func (bc *Bulk) timestamp() string { - // WARNING: Expects mutex locked. - now := time.Now() - if now.Unix() != bc.unix { - bc.unix = now.Unix() - bc.ts = now.UTC().Format(time.RFC3339) - } - - return bc.ts +// Add adds the agent to the connected list. +func (bc *Bulk) Add(id string) { + bc.mut.Lock() + defer bc.mut.Unlock() + bc.connected[id] = struct{}{} } -// CheckIn will add the agent (identified by id) to the pending set. -// The pending agents are sent to elasticsearch as a bulk update at each flush interval. -// NOTE: If Checkin is called after Run has returned it will just add the entry to the pending map and not do any operations, this may occur when the fleet-server is shutting down. -// WARNING: Bulk will take ownership of fields, so do not use after passing in. -func (bc *Bulk) CheckIn(id string, opts ...Option) error { +// Remove removes the agent from the connected list. +func (bc *Bulk) Remove(id string) { bc.mut.Lock() defer bc.mut.Unlock() + delete(bc.connected, id) +} - // possible there is already a check-in queued for the same ID - // if that is present we need to be sure to use that pending - // instead of creating a new one - pending, ok := bc.pending[id] - if !ok { - pending = pendingT{ - ts: bc.timestamp(), - } +// CheckIn records that the agent has checked-in. +// +// This does not call `Add`, the caller should also call that to record that the agent is connected so that bulk +// can keep the documents `updated_at` fields updated. +// WARNING: CheckIn will take ownership of fields, so do not use after passing in. +func (bc *Bulk) CheckIn(ctx context.Context, id string, opts ...Option) error { + now := time.Now().UTC().Format(time.RFC3339) + checkin := checkinT{ + ts: now, } - for _, opt := range opts { - opt(&pending) + opt(&checkin) + } + + // update the agent document + body, err := toUpdateBody(checkin) + if err != nil { + return fmt.Errorf("could not marshall update body: %w", err) + } + bulkOpts := []bulk.Opt{bulk.WithRetryOnConflict(3)} + if checkin.seqNo.IsSet() { + bulkOpts = append(bulkOpts, bulk.WithRefresh()) + } + err = bc.bulker.Update(ctx, dl.FleetAgents, id, body, bulkOpts...) + if err != nil { + return fmt.Errorf("failed to update document: %w", err) + } + + // deleteAudit performs a second request (being that this should not happen very often + // it's safer to only use this script when it is needed). + if checkin.deleteAudit { + action := &estypes.UpdateAction{ + Script: &estypes.Script{ + Lang: &scriptlanguage.Painless, + Source: &deleteAuditAttributesScript, + Options: map[string]string{}, + }, + } + body, err = json.Marshal(&action) + if err != nil { + return fmt.Errorf("could not marshall script action: %w", err) + } + err = bc.bulker.Update(ctx, dl.FleetAgents, id, body, bulk.WithRetryOnConflict(3)) + if err != nil { + return fmt.Errorf("failed to remove audit fields with script update: %w", err) + } } - bc.pending[id] = pending return nil } @@ -234,7 +280,7 @@ func (bc *Bulk) Run(ctx context.Context) error { for { select { case <-tick.C: - if err := bc.flush(ctx); err != nil { + if err := bc.flushConnected(ctx); err != nil { zerolog.Ctx(ctx).Error().Err(err).Msg("Eat bulk checkin error; Keep on truckin'") } @@ -244,201 +290,86 @@ func (bc *Bulk) Run(ctx context.Context) error { } } -// flush sends the minium data needed to update records in elasticsearch. -func (bc *Bulk) flush(ctx context.Context) error { - start := time.Now() - - bc.mut.Lock() - pending := bc.pending - bc.pending = make(map[string]pendingT, len(pending)) - bc.mut.Unlock() - - if len(pending) == 0 { - return nil - } - - updates := make([]bulk.MultiOp, 0, len(pending)) - - simpleCache := make(map[pendingT][]byte) - - nowTimestamp := start.UTC().Format(time.RFC3339) - - var err error - var needRefresh bool - for id, pendingData := range pending { - var body []byte - if pendingData.extra == nil { - // agents that checkin without extra attributes are cachable - // Cacheable agents can share the same status, message, and unhealthy reason. Timestamps are ignored. - // This prevents an extra JSON serialization when agents have the same update body. - var ok bool - body, ok = simpleCache[pendingData] - if !ok { - body, err = toUpdateBody(nowTimestamp, pendingData) - if err != nil { - return err - } - simpleCache[pendingData] = body - } - } else if pendingData.extra.deleteAudit { - if pendingData.extra.seqNo.IsSet() { - needRefresh = true - } - // Use a script instead of a partial doc to update if attributes need to be removed - params, err := encodeParams(nowTimestamp, pendingData) - if err != nil { - return fmt.Errorf("unable to parse checkin details as params: %w", err) - } - action := &estypes.UpdateAction{ - Script: &estypes.Script{ - Lang: &scriptlanguage.Painless, - Source: &deleteAuditAttributesScript, - Options: map[string]string{}, - Params: params, - }, - } - body, err = json.Marshal(&action) - if err != nil { - return fmt.Errorf("could not marshall script action: %w", err) - } - } else { - if pendingData.extra.seqNo.IsSet() { - needRefresh = true - } - body, err = toUpdateBody(nowTimestamp, pendingData) - if err != nil { - return err - } - } - +// flushConnected does an update that updates all the updated_at fields on all currently +// connected agents. +func (bc *Bulk) flushConnected(ctx context.Context) error { + nowTimestamp := time.Now().UTC().Format(time.RFC3339) + fields := bulk.UpdateFields{ + dl.FieldUpdatedAt: nowTimestamp, + } + body, err := fields.Marshal() + if err != nil { + return fmt.Errorf("marshal updated_at field error: %w", err) + } + bc.mut.RLock() + updates := make([]bulk.MultiOp, 0, len(bc.connected)) + for id := range bc.connected { updates = append(updates, bulk.MultiOp{ ID: id, Body: body, Index: dl.FleetAgents, }) } - - var opts []bulk.Opt - if needRefresh { - opts = append(opts, bulk.WithRefresh()) + bc.mut.RUnlock() + _, err = bc.bulker.MUpdate(ctx, updates) + if err != nil { + return fmt.Errorf("mupdate error: %w", err) } - - _, err = bc.bulker.MUpdate(ctx, updates, opts...) - - zerolog.Ctx(ctx).Trace(). - Err(err). - Dur("rtt", time.Since(start)). - Int("cnt", len(updates)). - Bool("refresh", needRefresh). - Msg("Flush updates") - - return err + return nil } -func toUpdateBody(now string, pending pendingT) ([]byte, error) { +func toUpdateBody(state checkinT) ([]byte, error) { fields := bulk.UpdateFields{ - dl.FieldUpdatedAt: now, // Set "updated_at" to the current timestamp - dl.FieldLastCheckin: pending.ts, // Set the checkin timestamp - dl.FieldLastCheckinStatus: pending.status, // Set the pending status - dl.FieldLastCheckinMessage: pending.message, // Set the status message - dl.FieldUnhealthyReason: pending.unhealthyReason, - } - if pending.agentPolicyID != "" { - fields[dl.FieldAgentPolicyID] = pending.agentPolicyID - fields[dl.FieldPolicyRevisionIdx] = pending.revisionIDX - } - if pending.extra != nil { - // If the agent version is not empty it needs to be updated - // Assuming the agent can by upgraded keeping the same id, but incrementing the version - if pending.extra.ver != "" { - fields[dl.FieldAgent] = map[string]interface{}{ - dl.FieldAgentVersion: pending.extra.ver, - } + dl.FieldUpdatedAt: state.ts, + dl.FieldLastCheckin: state.ts, + dl.FieldUnhealthyReason: state.unhealthyReason, + } + if state.status.isSet { + fields[dl.FieldLastCheckinStatus] = state.status.value + } + if state.message.isSet { + fields[dl.FieldLastCheckinMessage] = state.message.value + } + if state.agentPolicyID.isSet { + fields[dl.FieldAgentPolicyID] = state.agentPolicyID.value + } + if state.revisionIDX.isSet { + fields[dl.FieldPolicyRevisionIdx] = state.revisionIDX.value + } + // If the agent version is not empty it needs to be updated + // Assuming the agent can by upgraded keeping the same id, but incrementing the version + if state.ver != "" { + fields[dl.FieldAgent] = map[string]interface{}{ + dl.FieldAgentVersion: state.ver, } + } - // Update local metadata if provided (and has a value) - if len(pending.extra.meta) > 0 { - // Surprise: The json encoder compacts this raw JSON during - // the encode process, so there my be unexpected memory overhead: - // https://github.com/golang/go/blob/de5d7eccb99088e3ab42c0d907da6852d8f9cebe/src/encoding/json/encode.go#L503-L507 - fields[dl.FieldLocalMetadata] = json.RawMessage(pending.extra.meta) - } + // Update local metadata if provided (and has a value) + if len(state.meta) > 0 { + // Surprise: The json encoder compacts this raw JSON during + // the encode process, so there my be unexpected memory overhead: + // https://github.com/golang/go/blob/de5d7eccb99088e3ab42c0d907da6852d8f9cebe/src/encoding/json/encode.go#L503-L507 + fields[dl.FieldLocalMetadata] = json.RawMessage(state.meta) + } - // Update components if provided (and has a value) - if len(pending.extra.components) > 0 { - fields[dl.FieldComponents] = json.RawMessage(pending.extra.components) - } + // Update components if provided (and has a value) + if len(state.components) > 0 { + fields[dl.FieldComponents] = json.RawMessage(state.components) + } - // If seqNo changed, set the field appropriately - if pending.extra.seqNo.IsSet() { - fields[dl.FieldActionSeqNo] = pending.extra.seqNo - } + if state.upgradeDetails.isSet { + fields[dl.FieldUpgradeDetails] = state.upgradeDetails.value + } + if state.upgradeStartedAt.isSet { + fields[dl.FieldUpgradeStartedAt] = state.upgradeStartedAt.value + } + if state.upgradeStatus.isSet { + fields[dl.FieldUpgradeStatus] = state.upgradeStatus.value } - return fields.Marshal() -} -func encodeParams(now string, data pendingT) (map[string]json.RawMessage, error) { - var ( - tsNow json.RawMessage - ts json.RawMessage - status json.RawMessage - message json.RawMessage - reason json.RawMessage - - // optional attributes below - policyID json.RawMessage - revisionIDX json.RawMessage - ver json.RawMessage - meta json.RawMessage - components json.RawMessage - isSet json.RawMessage - seqNo json.RawMessage - - err error - ) - tsNow, err = json.Marshal(now) - Err := errors.Join(err) - ts, err = json.Marshal(data.ts) - Err = errors.Join(Err, err) - status, err = json.Marshal(data.status) - Err = errors.Join(Err, err) - message, err = json.Marshal(data.message) - Err = errors.Join(Err, err) - reason, err = json.Marshal(data.unhealthyReason) - Err = errors.Join(Err, err) - policyID, err = json.Marshal(data.agentPolicyID) - Err = errors.Join(Err, err) - revisionIDX, err = json.Marshal(data.revisionIDX) - Err = errors.Join(Err, err) - ver, err = json.Marshal(data.extra.ver) - Err = errors.Join(Err, err) - isSet, err = json.Marshal(data.extra.seqNo.IsSet()) - Err = errors.Join(Err, err) - seqNo, err = json.Marshal(data.extra.seqNo) - Err = errors.Join(Err, err) - if data.extra.meta != nil { - meta, err = json.Marshal(data.extra.meta) - Err = errors.Join(Err, err) - } - if data.extra.components != nil { - components, err = json.Marshal(data.extra.components) - Err = errors.Join(Err, err) - } - if Err != nil { - return nil, Err - } - return map[string]json.RawMessage{ - "Now": tsNow, - "TS": ts, - "Status": status, - "Message": message, - "UnhealthyReason": reason, - "PolicyID": policyID, - "RevisionIDX": revisionIDX, - "Ver": ver, - "Meta": meta, - "Components": components, - "SeqNoSet": isSet, - "SeqNo": seqNo, - }, nil + // If seqNo changed, set the field appropriately + if state.seqNo.IsSet() { + fields[dl.FieldActionSeqNo] = state.seqNo + } + return fields.Marshal() } diff --git a/internal/pkg/checkin/bulk_test.go b/internal/pkg/checkin/bulk_test.go index 4e325707b6..a5c2eafe92 100644 --- a/internal/pkg/checkin/bulk_test.go +++ b/internal/pkg/checkin/bulk_test.go @@ -8,7 +8,6 @@ import ( "context" "encoding/json" "testing" - "time" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/dl" @@ -28,21 +27,8 @@ import ( // Test with seq no // matchOp is used with mock.MatchedBy to match and validate the operation -func matchOp(tb testing.TB, c testcase, ts time.Time) func(ops []bulk.MultiOp) bool { - return func(ops []bulk.MultiOp) bool { - if len(ops) != 1 { - return false - } - if ops[0].ID != c.id { - return false - } - if ops[0].Index != dl.FleetAgents { - return false - } - tb.Log("Operation match! validating details...") - - // Decode and match operation - // NOTE putting the extra validation here seems strange, maybe we should read the args in the test body intstead? +func matchOp(tb testing.TB, c testcase) func([]byte) bool { + return func(body []byte) bool { type updateT struct { LastCheckin string `json:"last_checkin"` Status string `json:"last_checkin_status"` @@ -54,14 +40,14 @@ func matchOp(tb testing.TB, c testcase, ts time.Time) func(ops []bulk.MultiOp) b } m := make(map[string]updateT) - err := json.Unmarshal(ops[0].Body, &m) - require.NoErrorf(tb, err, "unable to validate operation body %s", string(ops[0].Body)) + err := json.Unmarshal(body, &m) + require.NoErrorf(tb, err, "unable to validate operation body %s", string(body)) sub, ok := m["doc"] require.True(tb, ok, "unable to validate operation: expected doc") - validateTimestamp(tb, ts.Truncate(time.Second), sub.LastCheckin) - validateTimestamp(tb, ts.Truncate(time.Second), sub.UpdatedAt) + assert.NotEmpty(tb, sub.LastCheckin) + assert.Equal(tb, sub.LastCheckin, sub.UpdatedAt) // should have same timestamp assert.Equal(tb, c.policyID, sub.AgentPolicyID) assert.Equal(tb, c.revisionIDX, sub.RevisionIDX) if c.seqno != nil { @@ -93,8 +79,6 @@ type testcase struct { } func TestBulkSimple(t *testing.T) { - start := time.Now() - const ver = "8.9.0" cases := []testcase{{ name: "Simple case", @@ -168,7 +152,7 @@ func TestBulkSimple(t *testing.T) { t.Run(c.name, func(t *testing.T) { ctx := testlog.SetLogger(t).WithContext(t.Context()) mockBulk := ftesting.NewMockBulk() - mockBulk.On("MUpdate", mock.Anything, mock.MatchedBy(matchOp(t, c, start)), mock.Anything).Return([]bulk.BulkIndexerResponseItem{}, nil).Once() + mockBulk.On("Update", mock.Anything, dl.FleetAgents, c.id, mock.MatchedBy(matchOp(t, c)), mock.Anything).Return(nil).Once() bc := NewBulk(mockBulk) opts := []Option{WithStatus(c.status), WithMessage(c.message)} @@ -191,9 +175,7 @@ func TestBulkSimple(t *testing.T) { opts = append(opts, WithUnhealthyReason(c.unhealthyReason)) } - err := bc.CheckIn(c.id, opts...) - require.NoError(t, err) - err = bc.flush(ctx) + err := bc.CheckIn(ctx, c.id, opts...) require.NoError(t, err) mockBulk.AssertExpectations(t) @@ -201,66 +183,6 @@ func TestBulkSimple(t *testing.T) { } } -func TestBulkReusePending(t *testing.T) { - ctx := testlog.SetLogger(t).WithContext(t.Context()) - - const ( - agentID = "test-agent-id" - status = "online" - message = "test message" - ) - - meta := []byte(`{"test":"metadata"}`) - - // Matcher that validates both the existing field (status) and new field (meta) are present - matchAccumulatedOps := func(ops []bulk.MultiOp) bool { - if len(ops) != 1 { - t.Errorf("Expected 1 operation, got %d", len(ops)) - return false - } - if ops[0].ID != agentID { - t.Errorf("Expected ID %s, got %s", agentID, ops[0].ID) - return false - } - - type updateT struct { - Status string `json:"last_checkin_status"` - Meta json.RawMessage `json:"local_metadata"` - } - - m := make(map[string]updateT) - err := json.Unmarshal(ops[0].Body, &m) - require.NoErrorf(t, err, "unable to validate operation body %s", string(ops[0].Body)) - - sub, ok := m["doc"] - require.True(t, ok, "unable to validate operation: expected doc") - - assert.Equal(t, status, sub.Status, "Expected status from first CheckIn to be preserved") - assert.Equal(t, json.RawMessage(meta), sub.Meta, "Expected metadata from second CheckIn to be added") - return true - } - - mockBulk := ftesting.NewMockBulk() - mockBulk.On("MUpdate", mock.Anything, mock.MatchedBy(matchAccumulatedOps), mock.Anything).Return([]bulk.BulkIndexerResponseItem{}, nil).Once() - - bc := NewBulk(mockBulk) - - err := bc.CheckIn(agentID, WithStatus(status), WithMessage(message)) - require.NoError(t, err) - err = bc.CheckIn(agentID, WithMeta(meta)) - require.NoError(t, err) - err = bc.flush(ctx) - require.NoError(t, err) - - mockBulk.AssertExpectations(t) -} - -func validateTimestamp(tb testing.TB, start time.Time, ts string) { - t1, err := time.Parse(time.RFC3339, ts) - require.NoErrorf(tb, err, "expected %q to be in RFC 3339 format", ts) - require.False(tb, start.After(t1), "timestamp in the past") -} - func benchmarkBulk(n int, b *testing.B) { mockBulk := ftesting.NewMockBulk() bc := NewBulk(mockBulk) @@ -275,10 +197,7 @@ func benchmarkBulk(n int, b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { for _, id := range ids { - err := bc.CheckIn(id) - if err != nil { - b.Fatal(err) - } + bc.Add(id) } } } @@ -301,14 +220,11 @@ func benchmarkFlush(n int, b *testing.B) { for i := 0; i < b.N; i++ { b.StopTimer() for _, id := range ids { - err := bc.CheckIn(id) // TODO ths benchmark is not very interesting as the simplecache is used - if err != nil { - b.Fatal(err) - } + bc.Add(id) } b.StartTimer() - err := bc.flush(ctx) + err := bc.flushConnected(ctx) if err != nil { b.Fatal(err) } diff --git a/internal/pkg/checkin/deleteAuditFieldsOnCheckin.painless b/internal/pkg/checkin/deleteAuditFieldsOnCheckin.painless index 3b0f111865..8774fee674 100644 --- a/internal/pkg/checkin/deleteAuditFieldsOnCheckin.painless +++ b/internal/pkg/checkin/deleteAuditFieldsOnCheckin.painless @@ -1,24 +1,3 @@ -ctx._source.last_checkin = params.TS; -ctx._source.updated_at = params.Now; -ctx._source.last_checkin_status = params.Status; -ctx._source.last_checkin_message = params.Message; -ctx._source.unhealthy_reason = params.UnhealthyReason; -if (params.PolicyID != "") { - ctx._source.agent_policy_id = params.PolicyID; - ctx._source.policy_revision_idx = params.RevisionIDX; -} -if (params.Ver != "") { - ctx._source.agent.version = params.Ver; -} -if (params.Meta != null) { - ctx._source.local_metadata = params.Meta; -} -if (params.Components != null) { - ctx._source.components = params.Components; -} -if (params.SeqNoSet) { - ctx._source.action_seq_no = params.SeqNo; -} ctx._source.remove('audit_unenrolled_reason'); ctx._source.remove('audit_unenrolled_time'); ctx._source.remove('unenrolled_at'); // audit/unenroll no longer sets this but it did in the 8.16.x releases, so we need to clear it. diff --git a/internal/pkg/policy/sub.go b/internal/pkg/policy/sub.go index e767c35f29..31f8830bc8 100644 --- a/internal/pkg/policy/sub.go +++ b/internal/pkg/policy/sub.go @@ -77,7 +77,7 @@ func (n *subT) isEmpty() bool { func (n *subT) isUpdate(policy *model.Policy) bool { pRevIdx := policy.RevisionIdx - return pRevIdx > n.revIdx + return pRevIdx != n.revIdx } // Output returns a new policy that needs to be sent based on the current subscription. diff --git a/internal/pkg/server/fleet.go b/internal/pkg/server/fleet.go index ef8aeb29b8..6d5b155f55 100644 --- a/internal/pkg/server/fleet.go +++ b/internal/pkg/server/fleet.go @@ -521,7 +521,7 @@ func (f *Fleet) runSubsystems(ctx context.Context, cfg *config.Config, g *errgro ad := action.NewDispatcher(am, cfg.Inputs[0].Server.Limits.ActionLimit.Interval, cfg.Inputs[0].Server.Limits.ActionLimit.Burst, bulker) g.Go(loggedRunFunc(ctx, "Action dispatcher", ad.Run)) - bc := checkin.NewBulk(bulker) + bc := checkin.NewBulk(bulker, checkin.WithFlushInterval(cfg.Inputs[0].Server.Timeouts.CheckinTimestamp)) g.Go(loggedRunFunc(ctx, "Bulk checkin", bc.Run)) ct, err := api.NewCheckinT(f.verCon, &cfg.Inputs[0].Server, f.cache, bc, pm, am, ad, bulker) diff --git a/internal/pkg/server/fleet_integration_test.go b/internal/pkg/server/fleet_integration_test.go index f3f04274cd..e0e410dbcf 100644 --- a/internal/pkg/server/fleet_integration_test.go +++ b/internal/pkg/server/fleet_integration_test.go @@ -65,7 +65,14 @@ const ( }` checkinBody = `{ "status": "online", - "message": "checkin ok" + "message": "checkin ok", + "local_metadata": { + "elastic": { + "agent": { + "version":"9.3.0" + } + } + } }` ) From 9071d5a9c207f71b13e33b3aea91262b90bd35f7 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Tue, 4 Nov 2025 18:34:22 -0500 Subject: [PATCH 05/11] Improve checkin. --- internal/pkg/api/handleCheckin.go | 165 +++++---- internal/pkg/api/handleCheckin_test.go | 350 ++++++++++-------- internal/pkg/checkin/bulk.go | 153 ++++---- internal/pkg/checkin/bulk_test.go | 14 +- internal/pkg/server/fleet_integration_test.go | 9 +- 5 files changed, 379 insertions(+), 312 deletions(-) diff --git a/internal/pkg/api/handleCheckin.go b/internal/pkg/api/handleCheckin.go index 78098e6b33..d3fbd21506 100644 --- a/internal/pkg/api/handleCheckin.go +++ b/internal/pkg/api/handleCheckin.go @@ -169,8 +169,8 @@ func invalidateAPIKeysOfInactiveAgent(ctx context.Context, zlog zerolog.Logger, type validatedCheckin struct { req *CheckinRequest dur time.Duration - rawMeta []byte - rawComp []byte + rawMeta json.RawMessage + rawComp json.RawMessage seqno sqn.SeqNo unhealthyReason *[]string } @@ -275,80 +275,37 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r seqno := validated.seqno unhealthyReason := validated.unhealthyReason - // Handle upgrade details for agents using the new 8.11 upgrade details field of the checkin. - // Older agents will communicate any issues with upgrades via the Ack endpoint. - upgradeCheckinOpts, err := ct.processUpgradeDetails(r.Context(), agent, req.UpgradeDetails) - if err != nil { - return fmt.Errorf("failed to update upgrade_details: %w", err) - } - - // Handle the policy ID and the revision idx upon check-in. We want to always ensure that - // the agent document represents the current state of this check-in. - policyID := agent.PolicyID - revIdx := agent.PolicyRevisionIdx - if !ct.cfg.Features.IgnoreCheckinPolicyID && req.AgentPolicyId != nil && req.PolicyRevisionIdx != nil { - policyID = *req.AgentPolicyId - revIdx = *req.PolicyRevisionIdx - zlog.Debug(). - Str("policy_id", agent.PolicyID). - Str("checkin_policy_id", policyID). - Int64("revision_idx", agent.PolicyRevisionIdx). - Int64("checkin_revision_idx", revIdx). - Msg("checkin included policy_id and revision_idx") - } - - checkinOpts := []checkin.Option{ + checkinOpts := make([]checkin.Option, 0, 14) // maximum of 14 (pre-allocate to reduce allocs) + checkinOpts = append(checkinOpts, checkin.WithStatus(string(req.Status)), checkin.WithMessage(req.Message), - checkin.WithAgentPolicyID(policyID), - checkin.WithPolicyRevisionIDX(revIdx), - checkin.WithMeta(rawMeta), - checkin.WithComponents(rawComponents), checkin.WithSeqNo(seqno), checkin.WithVer(ver), checkin.WithUnhealthyReason(unhealthyReason), checkin.WithDeleteAudit(agent.AuditUnenrolledReason != "" || agent.UnenrolledAt != ""), + ) + if rawMeta != nil && len(rawMeta) > 0 { + checkinOpts = append(checkinOpts, checkin.WithMeta(&rawMeta)) } - if upgradeCheckinOpts != nil { - checkinOpts = append(checkinOpts, upgradeCheckinOpts...) + if rawComponents != nil && len(rawComponents) > 0 { + checkinOpts = append(checkinOpts, checkin.WithComponents(&rawComponents)) } - // determine the effective policy ID and revision idx to use for the policy monitor. - // this can be different based on the situations to force a POLICY_CHANGE. - effectivePolicyID := policyID - effectiveRevIdx := revIdx - for _, output := range agent.Outputs { - if output.APIKey == "" { - // use revision_idx=0 if the agent has a single output where no API key is defined - // This will force the policy monitor to emit a new policy to regerate API keys - effectiveRevIdx = 0 - break - } + // Handle upgrade details for agents using the new 8.11 upgrade details field of the checkin. + // Older agents will communicate any issues with upgrades via the Ack endpoint. + // checkinOpts is passed in to reducing allocations + checkinOpts, err = ct.processUpgradeDetails(r.Context(), agent, req.UpgradeDetails, checkinOpts) + if err != nil { + return fmt.Errorf("failed to update upgrade_details: %w", err) } - if effectivePolicyID != agent.PolicyID || effectiveRevIdx != agent.PolicyRevisionIdx { - // the agent is either on the wrong policy or the wrong revision of that policy - zlog.Info(). - Str("policy_id", agent.PolicyID). - Str("effective_policy_id", effectivePolicyID). - Int64("revision_idx", agent.PolicyRevisionIdx). - Int64("effective_revision_idx", effectiveRevIdx). - Msg("effective policy is different; resulting in a POLICY_CHANGE") - // skipped when ignore checkin is enabled, because this never used to occur until this started - // using the policy information in the checkin body - if !ct.cfg.Features.IgnoreCheckinPolicyID { - // this invalidates any of the API keys - for outputName, output := range agent.Outputs { - if output.Type != policy.OutputTypeElasticsearch { - continue - } - err = updateAPIKey(r.Context(), zlog, ct.bulker, agent.Id, output.APIKeyID, output.PermissionsHash, output.ToRetireAPIKeyIds, outputName) - if err != nil { - // Only returns ErrUpdatingInactiveAgent - return fmt.Errorf("failed to update policy details: %w", err) - } - } - } + // Handle the policy ID and the revision idx upon check-in. We want to always ensure that + // the agent document represents the current state of this check-in. + var effectivePolicyID string + var effectiveRevIdx int64 + checkinOpts, effectivePolicyID, effectiveRevIdx, err = ct.processPolicyDetails(r.Context(), zlog, agent, req, checkinOpts) + if err != nil { + return fmt.Errorf("failed to process policy details: %w", err) } // Subscribe to actions dispatcher @@ -482,16 +439,17 @@ func (ct *CheckinT) verifyActionExists(vCtx context.Context, vSpan *apm.Span, ag // if the agent doc and checkin details are both nil the method is a nop // if the checkin upgrade_details is nil but there was a previous value in the agent doc, fleet-server treats it as a successful upgrade // otherwise the details are validated; action_id is checked and upgrade_details.metadata is validated based on upgrade_details.state and the agent doc is updated. -func (ct *CheckinT) processUpgradeDetails(ctx context.Context, agent *model.Agent, details *UpgradeDetails) ([]checkin.Option, error) { +func (ct *CheckinT) processUpgradeDetails(ctx context.Context, agent *model.Agent, details *UpgradeDetails, checkinOpts []checkin.Option) ([]checkin.Option, error) { if details == nil { if agent.UpgradeDetails == nil { return nil, nil } - return []checkin.Option{ + checkinOpts = append(checkinOpts, checkin.WithUpgradeDetails(nil), checkin.WithUpgradeStartedAt(nil), checkin.WithUpgradeStatus(nil), - }, nil + ) + return checkinOpts, nil } // update docs with in progress details @@ -583,13 +541,12 @@ func (ct *CheckinT) processUpgradeDetails(ctx context.Context, agent *model.Agen if err != nil { return nil, fmt.Errorf("failed to marshal upgrade details: %w", err) } - opts := []checkin.Option{ - checkin.WithUpgradeDetails(&detailsBody), - } + detailsBodyJSON := json.RawMessage(detailsBody) + checkinOpts = append(checkinOpts, checkin.WithUpgradeDetails(&detailsBodyJSON)) if agent.UpgradeAttempts != nil && details.State == UpgradeDetailsStateUPGWATCHING { - opts = append(opts, checkin.WithUpgradeAttempts(nil)) + checkinOpts = append(checkinOpts, checkin.WithUpgradeAttempts(nil)) } - return opts, nil + return checkinOpts, nil } func (ct *CheckinT) writeResponse(zlog zerolog.Logger, w http.ResponseWriter, r *http.Request, agent *model.Agent, resp CheckinResponse) error { @@ -1215,3 +1172,65 @@ func calcPollDuration(zlog zerolog.Logger, pollDuration, setupDuration, jitterDu return pollDuration, jitter } + +func (ct *CheckinT) processPolicyDetails(ctx context.Context, zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest, checkinOpts []checkin.Option) ([]checkin.Option, string, int64, error) { + // Handle the policy ID and the revision idx upon check-in. We want to always ensure that + // the agent document represents the current state of this check-in. + policyID := agent.PolicyID + revIdx := agent.PolicyRevisionIdx + if !ct.cfg.Features.IgnoreCheckinPolicyID && req.AgentPolicyId != nil && *req.AgentPolicyId != "" && req.PolicyRevisionIdx != nil && *req.PolicyRevisionIdx >= 0 { + policyID = *req.AgentPolicyId + revIdx = *req.PolicyRevisionIdx + zlog.Debug(). + Str("policy_id", agent.PolicyID). + Str("checkin_policy_id", policyID). + Int64("revision_idx", agent.PolicyRevisionIdx). + Int64("checkin_revision_idx", revIdx). + Msg("checkin included policy_id and revision_idx") + checkinOpts = append(checkinOpts, checkin.WithAgentPolicyID(policyID), checkin.WithPolicyRevisionIDX(revIdx)) + } + + // determine the effective policy ID and revision idx to use for the policy monitor. + // this can be different based on the situations to force a POLICY_CHANGE. + effectivePolicyID := policyID + effectiveRevIdx := revIdx + for _, output := range agent.Outputs { + if output.APIKey == "" { + // use revision_idx=0 if the agent has a single output where no API key is defined + // This will force the policy monitor to emit a new policy to regenerate API keys + effectiveRevIdx = 0 + break + } + } + if effectivePolicyID != agent.PolicyID { + // different policyID so the revisionIdx gets cleared + effectiveRevIdx = 0 + } + if effectivePolicyID != agent.PolicyID || effectiveRevIdx != agent.PolicyRevisionIdx { + // the agent is either on the wrong policy or the wrong revision of that policy + zlog.Info(). + Str("policy_id", agent.PolicyID). + Str("effective_policy_id", effectivePolicyID). + Int64("revision_idx", agent.PolicyRevisionIdx). + Int64("effective_revision_idx", effectiveRevIdx). + Msg("effective policy is different; resulting in a POLICY_CHANGE") + + // skipped when ignore checkin is enabled, because this never used to occur until this started + // using the policy information in the checkin body + if !ct.cfg.Features.IgnoreCheckinPolicyID { + // this invalidates any of the API keys + for outputName, output := range agent.Outputs { + if output.Type != policy.OutputTypeElasticsearch { + continue + } + err := updateAPIKey(ctx, zlog, ct.bulker, agent.Id, output.APIKeyID, output.PermissionsHash, output.ToRetireAPIKeyIds, outputName) + if err != nil { + // Only returns ErrUpdatingInactiveAgent + return nil, "", 0, fmt.Errorf("failed to update policy details: %w", err) + } + } + } + } + + return checkinOpts, effectivePolicyID, effectiveRevIdx, nil +} diff --git a/internal/pkg/api/handleCheckin_test.go b/internal/pkg/api/handleCheckin_test.go index c52a324052..a9f9d2aaaf 100644 --- a/internal/pkg/api/handleCheckin_test.go +++ b/internal/pkg/api/handleCheckin_test.go @@ -18,7 +18,6 @@ import ( "testing" "time" - "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/cache" "github.com/elastic/fleet-server/v7/internal/pkg/checkin" "github.com/elastic/fleet-server/v7/internal/pkg/config" @@ -376,28 +375,22 @@ func TestResolveSeqNo(t *testing.T) { func TestProcessUpgradeDetails(t *testing.T) { esd := model.ESDocument{Id: "doc-ID"} - doc := bulk.UpdateFields{ - dl.FieldUpgradeDetails: &UpgradeDetails{ - ActionId: "test-action", - State: UpgradeDetailsStateUPGWATCHING, - }, - dl.FieldUpgradeAttempts: nil, - } - body, err := doc.Marshal() - if err != nil { - t.Fatalf("marshal error: %v", err) - } tests := []struct { name string agent *model.Agent details *UpgradeDetails - opts []checkin.Option + bulk func() *ftesting.MockBulk cache func() *testcache.MockCache err error }{{ name: "agent and checkin details are nil", agent: &model.Agent{ESDocument: esd}, details: nil, + bulk: func() *ftesting.MockBulk { + mBulk := ftesting.NewMockBulk() + noUpgradeDetailsMockCheck(t, mBulk) + return mBulk + }, cache: func() *testcache.MockCache { return testcache.NewMockCache() }, @@ -470,6 +463,7 @@ func TestProcessUpgradeDetails(t *testing.T) { bulk: func() *ftesting.MockBulk { mBulk := ftesting.NewMockBulk() mBulk.On("Search", mock.Anything, dl.FleetActions, mock.Anything, mock.Anything).Return(&es.ResultT{}, es.ErrNotFound) + noUpgradeDetailsMockCheck(t, mBulk) return mBulk }, cache: func() *testcache.MockCache { @@ -506,7 +500,9 @@ func TestProcessUpgradeDetails(t *testing.T) { Metadata: &UpgradeDetails_Metadata{json.RawMessage(`{"scheduled_at":"2023:01:02T12:00:00Z"}`)}, }, bulk: func() *ftesting.MockBulk { - return ftesting.NewMockBulk() + mBulk := ftesting.NewMockBulk() + noUpgradeDetailsMockCheck(t, mBulk) + return mBulk }, cache: func() *testcache.MockCache { mCache := testcache.NewMockCache() @@ -523,7 +519,9 @@ func TestProcessUpgradeDetails(t *testing.T) { Metadata: &UpgradeDetails_Metadata{json.RawMessage(`{"scheduled_at":""}`)}, }, bulk: func() *ftesting.MockBulk { - return ftesting.NewMockBulk() + mBulk := ftesting.NewMockBulk() + noUpgradeDetailsMockCheck(t, mBulk) + return mBulk }, cache: func() *testcache.MockCache { mCache := testcache.NewMockCache() @@ -539,7 +537,9 @@ func TestProcessUpgradeDetails(t *testing.T) { State: UpgradeDetailsStateUPGSCHEDULED, }, bulk: func() *ftesting.MockBulk { - return ftesting.NewMockBulk() + mBulk := ftesting.NewMockBulk() + noUpgradeDetailsMockCheck(t, mBulk) + return mBulk }, cache: func() *testcache.MockCache { mCache := testcache.NewMockCache() @@ -557,7 +557,7 @@ func TestProcessUpgradeDetails(t *testing.T) { }, bulk: func() *ftesting.MockBulk { mBulk := ftesting.NewMockBulk() - mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.Anything, mock.Anything, mock.Anything).Return(nil) + upgradeDetailsMockCheck(t, mBulk) return mBulk }, cache: func() *testcache.MockCache { @@ -576,7 +576,7 @@ func TestProcessUpgradeDetails(t *testing.T) { }, bulk: func() *ftesting.MockBulk { mBulk := ftesting.NewMockBulk() - mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.Anything, mock.Anything, mock.Anything).Return(nil) + upgradeDetailsMockCheck(t, mBulk) return mBulk }, cache: func() *testcache.MockCache { @@ -668,7 +668,7 @@ func TestProcessUpgradeDetails(t *testing.T) { }, bulk: func() *ftesting.MockBulk { mBulk := ftesting.NewMockBulk() - mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.Anything, mock.Anything, mock.Anything).Return(nil) + upgradeDetailsMockCheck(t, mBulk) return mBulk }, cache: func() *testcache.MockCache { @@ -687,7 +687,7 @@ func TestProcessUpgradeDetails(t *testing.T) { }, bulk: func() *ftesting.MockBulk { mBulk := ftesting.NewMockBulk() - mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.Anything, mock.Anything, mock.Anything).Return(nil) + upgradeDetailsMockCheck(t, mBulk) return mBulk }, cache: func() *testcache.MockCache { @@ -706,7 +706,7 @@ func TestProcessUpgradeDetails(t *testing.T) { }, bulk: func() *ftesting.MockBulk { mBulk := ftesting.NewMockBulk() - mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.Anything, mock.Anything, mock.Anything).Return(nil) + upgradeDetailsMockCheck(t, mBulk) return mBulk }, cache: func() *testcache.MockCache { @@ -724,7 +724,9 @@ func TestProcessUpgradeDetails(t *testing.T) { Metadata: &UpgradeDetails_Metadata{json.RawMessage(`{"error_msg":""}`)}, }, bulk: func() *ftesting.MockBulk { - return ftesting.NewMockBulk() + mBulk := ftesting.NewMockBulk() + noUpgradeDetailsMockCheck(t, mBulk) + return mBulk }, cache: func() *testcache.MockCache { mCache := testcache.NewMockCache() @@ -741,7 +743,18 @@ func TestProcessUpgradeDetails(t *testing.T) { }, bulk: func() *ftesting.MockBulk { mBulk := ftesting.NewMockBulk() - mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", body, mock.Anything, mock.Anything).Return(nil) + mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.MatchedBy(func(p []byte) bool { + doc := struct { + Doc map[string]interface{} `json:"doc"` + }{} + if err := json.Unmarshal(p, &doc); err != nil { + t.Logf("bulk match unmarshal error: %v", err) + return false + } + _, upgradeDetails := doc.Doc[dl.FieldUpgradeDetails] + upgradeAttempts, ok := doc.Doc[dl.FieldUpgradeAttempts] + return upgradeDetails && ok && upgradeAttempts == nil && doc.Doc[dl.FieldUpgradedAt] != "" + }), mock.Anything, mock.Anything).Return(nil) return mBulk }, cache: func() *testcache.MockCache { @@ -757,23 +770,63 @@ func TestProcessUpgradeDetails(t *testing.T) { mBulk := tc.bulk() mCache := tc.cache() + bc := checkin.NewBulk(mBulk) ct := &CheckinT{ cache: mCache, + bc: bc, bulker: mBulk, } - opts, err := ct.processUpgradeDetails(context.Background(), tc.agent, tc.details) + var err error + opts := make([]checkin.Option, 0, 3) + opts, err = ct.processUpgradeDetails(context.Background(), tc.agent, tc.details, opts) if tc.err == nil { - assert.NoError(t, err) + require.NoError(t, err) } else { - assert.ErrorIs(t, err, tc.err) + require.ErrorIs(t, err, tc.err) } + + err = bc.CheckIn(context.Background(), tc.agent.Id, opts...) + if err != nil { + require.NoError(t, err) + } + mBulk.AssertExpectations(t) mCache.AssertExpectations(t) }) } } +func noUpgradeDetailsMockCheck(t *testing.T, mBulk *ftesting.MockBulk) { + mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.MatchedBy(func(p []byte) bool { + doc := struct { + Doc map[string]interface{} `json:"doc"` + }{} + if err := json.Unmarshal(p, &doc); err != nil { + t.Logf("bulk match unmarshal error: %v", err) + return false + } + _, noUpgradeDetails := doc.Doc[dl.FieldUpgradeDetails] + _, noUpgradeStartedAt := doc.Doc[dl.FieldUpgradeStartedAt] + _, noUpgradeStatus := doc.Doc[dl.FieldUpgradeStatus] + return !noUpgradeDetails && !noUpgradeStartedAt && !noUpgradeStatus && doc.Doc[dl.FieldUpgradedAt] != "" + }), mock.Anything, mock.Anything).Return(nil) +} + +func upgradeDetailsMockCheck(t *testing.T, mBulk *ftesting.MockBulk) { + mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.MatchedBy(func(p []byte) bool { + doc := struct { + Doc map[string]interface{} `json:"doc"` + }{} + if err := json.Unmarshal(p, &doc); err != nil { + t.Logf("bulk match unmarshal error: %v", err) + return false + } + _, upgradeDetails := doc.Doc[dl.FieldUpgradeDetails] + return upgradeDetails && doc.Doc[dl.FieldUpgradedAt] != "" + }), mock.Anything, mock.Anything).Return(nil) +} + func Test_CheckinT_writeResponse(t *testing.T) { tests := []struct { name string @@ -1255,34 +1308,36 @@ func TestValidateCheckinRequest(t *testing.T) { } func TestProcessPolicyDetails(t *testing.T) { + esd := model.ESDocument{Id: "doc-ID"} policyID := "policy-id" revIDX2 := int64(2) tests := []struct { - name string - agent *model.Agent - req *CheckinRequest - getPolicyMonitor func() *mockPolicyMonitor - revIDX int64 - returnsOpts bool - err error + name string + agent *model.Agent + req *CheckinRequest + policyID string + revIdx int64 + bulk func() *ftesting.MockBulk + ignoreCheckin bool }{{ name: "request has no policy details", agent: &model.Agent{ + ESDocument: esd, + PolicyID: policyID, PolicyRevisionIdx: 1, }, - req: &CheckinRequest{}, - getPolicyMonitor: func() *mockPolicyMonitor { - return &mockPolicyMonitor{} + req: &CheckinRequest{}, + policyID: policyID, + revIdx: 1, + bulk: func() *ftesting.MockBulk { + mBulk := ftesting.NewMockBulk() + noPolicyDetailsMockCheck(t, mBulk) + return mBulk }, - revIDX: 1, - returnsOpts: false, - err: nil, }, { name: "policy reassign detected", agent: &model.Agent{ - Agent: &model.AgentMetadata{ - ID: "agent-id", - }, + ESDocument: esd, PolicyID: "new-policy-id", AgentPolicyID: policyID, PolicyRevisionIdx: 2, @@ -1291,40 +1346,17 @@ func TestProcessPolicyDetails(t *testing.T) { AgentPolicyId: &policyID, PolicyRevisionIdx: &revIDX2, }, - getPolicyMonitor: func() *mockPolicyMonitor { - return &mockPolicyMonitor{} + policyID: policyID, + revIdx: 0, + bulk: func() *ftesting.MockBulk { + mBulk := ftesting.NewMockBulk() + policyDetailsMockCheck(t, mBulk, policyID, revIDX2) + return mBulk }, - revIDX: 0, - returnsOpts: false, - err: nil, }, { name: "revision updated", agent: &model.Agent{ - Agent: &model.AgentMetadata{ - ID: "agent-id", - }, - PolicyID: policyID, - AgentPolicyID: policyID, - PolicyRevisionIdx: 1, - }, - req: &CheckinRequest{ - AgentPolicyId: &policyID, - PolicyRevisionIdx: &revIDX2, - }, - getPolicyMonitor: func() *mockPolicyMonitor { - pm := &mockPolicyMonitor{} - pm.On("LatestRev", mock.Anything, policyID).Return(int64(2)).Once() - return pm - }, - revIDX: 2, - returnsOpts: true, - err: nil, - }, { - name: "checkin revision is greater than the policy's latest revision", - agent: &model.Agent{ - Agent: &model.AgentMetadata{ - ID: "agent-id", - }, + ESDocument: esd, PolicyID: policyID, AgentPolicyID: policyID, PolicyRevisionIdx: 1, @@ -1333,63 +1365,54 @@ func TestProcessPolicyDetails(t *testing.T) { AgentPolicyId: &policyID, PolicyRevisionIdx: &revIDX2, }, - getPolicyMonitor: func() *mockPolicyMonitor { - pm := &mockPolicyMonitor{} - pm.On("LatestRev", mock.Anything, policyID).Return(int64(1)).Once() - return pm + policyID: policyID, + revIdx: revIDX2, + bulk: func() *ftesting.MockBulk { + mBulk := ftesting.NewMockBulk() + policyDetailsMockCheck(t, mBulk, policyID, revIDX2) + return mBulk }, - revIDX: 0, - returnsOpts: true, - err: nil, }, { - name: "agent_policy_id has changed", + name: "agent does not have agent_policy_id present", agent: &model.Agent{ - Agent: &model.AgentMetadata{ - ID: "agent-id", - }, + ESDocument: esd, PolicyID: policyID, - AgentPolicyID: "old-policy-id", - PolicyRevisionIdx: 1, + PolicyRevisionIdx: 2, }, req: &CheckinRequest{ AgentPolicyId: &policyID, PolicyRevisionIdx: &revIDX2, }, - getPolicyMonitor: func() *mockPolicyMonitor { - pm := &mockPolicyMonitor{} - pm.On("LatestRev", mock.Anything, policyID).Return(int64(2)).Once() - return pm + policyID: policyID, + revIdx: revIDX2, + bulk: func() *ftesting.MockBulk { + mBulk := ftesting.NewMockBulk() + policyDetailsMockCheck(t, mBulk, policyID, revIDX2) + return mBulk }, - revIDX: 2, - returnsOpts: true, - err: nil, }, { - name: "agent does not have agent_policy_id present", + name: "details present with no changes for agent doc", agent: &model.Agent{ - Agent: &model.AgentMetadata{ - ID: "agent-id", - }, + ESDocument: esd, + AgentPolicyID: policyID, PolicyID: policyID, - PolicyRevisionIdx: 2, + PolicyRevisionIdx: revIDX2, }, req: &CheckinRequest{ AgentPolicyId: &policyID, PolicyRevisionIdx: &revIDX2, }, - getPolicyMonitor: func() *mockPolicyMonitor { - pm := &mockPolicyMonitor{} - pm.On("LatestRev", mock.Anything, policyID).Return(int64(2)).Once() - return pm + policyID: policyID, + revIdx: revIDX2, + bulk: func() *ftesting.MockBulk { + mBulk := ftesting.NewMockBulk() + policyDetailsMockCheck(t, mBulk, policyID, revIDX2) + return mBulk }, - revIDX: 2, - returnsOpts: true, - err: nil, }, { - name: "details present with no changes from agent doc", + name: "details present ignore checkin", agent: &model.Agent{ - Agent: &model.AgentMetadata{ - ID: "agent-id", - }, + ESDocument: esd, AgentPolicyID: policyID, PolicyID: policyID, PolicyRevisionIdx: revIDX2, @@ -1398,63 +1421,82 @@ func TestProcessPolicyDetails(t *testing.T) { AgentPolicyId: &policyID, PolicyRevisionIdx: &revIDX2, }, - getPolicyMonitor: func() *mockPolicyMonitor { - pm := &mockPolicyMonitor{} - pm.On("LatestRev", mock.Anything, policyID).Return(int64(2)).Once() - return pm + policyID: policyID, + revIdx: revIDX2, + bulk: func() *ftesting.MockBulk { + mBulk := ftesting.NewMockBulk() + noPolicyDetailsMockCheck(t, mBulk) + return mBulk }, - revIDX: 2, - returnsOpts: false, - err: nil, + ignoreCheckin: true, }} for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { logger := testlog.SetLogger(t) - pm := tc.getPolicyMonitor() - checkin := &CheckinT{ - cfg: &config.Server{}, - bulker: ftesting.NewMockBulk(), - pm: pm, - } - revIDX, opts, err := checkin.processPolicyDetails(t.Context(), logger, tc.agent, tc.req) - assert.Equal(t, tc.revIDX, revIDX) - if tc.returnsOpts { - assert.NotEmpty(t, opts) - } else { - assert.Empty(t, opts) + mBulk := tc.bulk() + bc := checkin.NewBulk(mBulk) + cfg := &config.Server{} + if tc.ignoreCheckin { + cfg.Features.IgnoreCheckinPolicyID = true } - if tc.err != nil { - assert.ErrorIs(t, tc.err, err) - } else { - assert.NoError(t, err) + ct := &CheckinT{ + bc: bc, + bulker: mBulk, + cfg: cfg, } - pm.AssertExpectations(t) + + opts := make([]checkin.Option, 0, 2) + opts, ePolicyID, eRevIdx, err := ct.processPolicyDetails(t.Context(), logger, tc.agent, tc.req, opts) + require.NoError(t, err) + assert.Equal(t, tc.policyID, ePolicyID) + assert.Equal(t, tc.revIdx, eRevIdx) + + err = bc.CheckIn(t.Context(), tc.agent.Id, opts...) + assert.NoError(t, err) + + mBulk.AssertExpectations(t) }) } +} - t.Run("IgnoreCheckinPolicyID flag is set", func(t *testing.T) { - logger := testlog.SetLogger(t) - checkin := &CheckinT{ - cfg: &config.Server{ - Features: config.FeatureFlags{ - IgnoreCheckinPolicyID: true, - }, - }, +func noPolicyDetailsMockCheck(t *testing.T, mBulk *ftesting.MockBulk) { + mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.MatchedBy(func(p []byte) bool { + doc := struct { + Doc map[string]interface{} `json:"doc"` + }{} + if err := json.Unmarshal(p, &doc); err != nil { + t.Logf("bulk match unmarshal error: %v", err) + return false } - revIDX, opts, err := checkin.processPolicyDetails(t.Context(), logger, - &model.Agent{ - PolicyID: policyID, - PolicyRevisionIdx: 1, - }, - &CheckinRequest{ - AgentPolicyId: &policyID, - PolicyRevisionIdx: &revIDX2, - }, - ) - assert.NoError(t, err) - assert.Equal(t, int64(1), revIDX) - assert.Empty(t, opts) - }) + _, noAgentPolicyID := doc.Doc[dl.FieldAgentPolicyID] + _, noPolicyRevisionIdx := doc.Doc[dl.FieldPolicyRevisionIdx] + return !noAgentPolicyID && !noPolicyRevisionIdx && doc.Doc[dl.FieldUpgradedAt] != "" + }), mock.Anything, mock.Anything).Return(nil) +} + +func policyDetailsMockCheck(t *testing.T, mBulk *ftesting.MockBulk, policyID string, revIdx int64) { + mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.MatchedBy(func(p []byte) bool { + doc := struct { + Doc map[string]interface{} `json:"doc"` + }{} + if err := json.Unmarshal(p, &doc); err != nil { + t.Logf("bulk match unmarshal error: %v", err) + return false + } + oPolicyID, hasPolicy := doc.Doc[dl.FieldAgentPolicyID] + if !hasPolicy { + return false + } + oRevIdx, hasRevIdx := doc.Doc[dl.FieldPolicyRevisionIdx] + if !hasRevIdx { + return false + } + oRevIdxF, ok := oRevIdx.(float64) + if !ok { + return false + } + return oPolicyID == policyID && int64(oRevIdxF) == revIdx && doc.Doc[dl.FieldUpgradedAt] != "" + }), mock.Anything, mock.Anything).Return(nil) } diff --git a/internal/pkg/checkin/bulk.go b/internal/pkg/checkin/bulk.go index 9493e23a9e..4465fe5712 100644 --- a/internal/pkg/checkin/bulk.go +++ b/internal/pkg/checkin/bulk.go @@ -59,17 +59,15 @@ func WithMessage(message string) Option { func WithUnhealthyReason(reason *[]string) Option { return func(state *checkinT) { - state.unhealthyReason = reason + state.unhealthyReason.isSet = true + state.unhealthyReason.value = reason } } -func WithMeta(meta []byte) Option { +func WithMeta(meta *json.RawMessage) Option { return func(state *checkinT) { - if len(meta) == 0 { - // no real meta; do nothing - return - } - state.meta = meta + state.meta.isSet = true + state.meta.value = meta } } @@ -84,17 +82,15 @@ func WithSeqNo(seqno sqn.SeqNo) Option { func WithVer(ver string) Option { return func(state *checkinT) { - state.ver = ver + state.ver.isSet = true + state.ver.value = ver } } -func WithComponents(components []byte) Option { +func WithComponents(components *json.RawMessage) Option { return func(state *checkinT) { - if len(components) == 0 { - // no real components; do nothing - return - } - state.components = components + state.components.isSet = true + state.components.value = components } } @@ -121,7 +117,7 @@ func WithPolicyRevisionIDX(idx int64) Option { } } -func WithUpgradeDetails(details *[]byte) Option { +func WithUpgradeDetails(details *json.RawMessage) Option { return func(state *checkinT) { state.upgradeDetails.isSet = true state.upgradeDetails.value = details @@ -158,18 +154,18 @@ type checkinT struct { ts string seqNo sqn.SeqNo - ver string + ver setT[string] status setT[string] message setT[string] - unhealthyReason *[]string + unhealthyReason setT[*[]string] agentPolicyID setT[string] revisionIDX setT[int64] - meta []byte - components []byte + meta setT[*json.RawMessage] + components setT[*json.RawMessage] - upgradeDetails setT[*[]byte] + upgradeDetails setT[*json.RawMessage] upgradeStartedAt setT[*string] upgradeStatus setT[*string] upgradeAttempts setT[*[]string] @@ -177,6 +173,65 @@ type checkinT struct { deleteAudit bool } +func (c *checkinT) toBody() ([]byte, error) { + fields := bulk.UpdateFields{ + dl.FieldUpdatedAt: c.ts, + dl.FieldLastCheckin: c.ts, + dl.FieldUnhealthyReason: c.unhealthyReason, + } + if c.status.isSet { + fields[dl.FieldLastCheckinStatus] = c.status.value + } + if c.message.isSet { + fields[dl.FieldLastCheckinMessage] = c.message.value + } + if c.agentPolicyID.isSet { + fields[dl.FieldAgentPolicyID] = c.agentPolicyID.value + } + if c.revisionIDX.isSet { + fields[dl.FieldPolicyRevisionIdx] = c.revisionIDX.value + } + // If the agent version is not empty it needs to be updated + // Assuming the agent can be upgraded keeping the same id, but incrementing the version + if c.ver.isSet && c.ver.value != "" { + fields[dl.FieldAgent] = map[string]interface{}{ + dl.FieldAgentVersion: c.ver, + } + } + + // Update local metadata if provided (and has a value) + if c.meta.isSet { + // Surprise: The json encoder compacts this raw JSON during + // the encode process, so there my be unexpected memory overhead: + // https://github.com/golang/go/blob/de5d7eccb99088e3ab42c0d907da6852d8f9cebe/src/encoding/json/encode.go#L503-L507 + fields[dl.FieldLocalMetadata] = c.meta.value + } + + // Update components if provided (and has a value) + if c.components.isSet { + fields[dl.FieldComponents] = c.components.value + } + + if c.upgradeDetails.isSet { + fields[dl.FieldUpgradeDetails] = c.upgradeDetails.value + } + if c.upgradeStartedAt.isSet { + fields[dl.FieldUpgradeStartedAt] = c.upgradeStartedAt.value + } + if c.upgradeStatus.isSet { + fields[dl.FieldUpgradeStatus] = c.upgradeStatus.value + } + if c.upgradeAttempts.isSet { + fields[dl.FieldUpgradeAttempts] = c.upgradeAttempts.value + } + + // If seqNo changed, set the field appropriately + if c.seqNo.IsSet() { + fields[dl.FieldActionSeqNo] = c.seqNo + } + return fields.Marshal() +} + // Bulk handles checkins and keeps connected agents updated_at timestamp up-to-date. type Bulk struct { opts optionsT @@ -236,7 +291,7 @@ func (bc *Bulk) CheckIn(ctx context.Context, id string, opts ...Option) error { } // update the agent document - body, err := toUpdateBody(checkin) + body, err := checkin.toBody() if err != nil { return fmt.Errorf("could not marshall update body: %w", err) } @@ -317,59 +372,3 @@ func (bc *Bulk) flushConnected(ctx context.Context) error { } return nil } - -func toUpdateBody(state checkinT) ([]byte, error) { - fields := bulk.UpdateFields{ - dl.FieldUpdatedAt: state.ts, - dl.FieldLastCheckin: state.ts, - dl.FieldUnhealthyReason: state.unhealthyReason, - } - if state.status.isSet { - fields[dl.FieldLastCheckinStatus] = state.status.value - } - if state.message.isSet { - fields[dl.FieldLastCheckinMessage] = state.message.value - } - if state.agentPolicyID.isSet { - fields[dl.FieldAgentPolicyID] = state.agentPolicyID.value - } - if state.revisionIDX.isSet { - fields[dl.FieldPolicyRevisionIdx] = state.revisionIDX.value - } - // If the agent version is not empty it needs to be updated - // Assuming the agent can by upgraded keeping the same id, but incrementing the version - if state.ver != "" { - fields[dl.FieldAgent] = map[string]interface{}{ - dl.FieldAgentVersion: state.ver, - } - } - - // Update local metadata if provided (and has a value) - if len(state.meta) > 0 { - // Surprise: The json encoder compacts this raw JSON during - // the encode process, so there my be unexpected memory overhead: - // https://github.com/golang/go/blob/de5d7eccb99088e3ab42c0d907da6852d8f9cebe/src/encoding/json/encode.go#L503-L507 - fields[dl.FieldLocalMetadata] = json.RawMessage(state.meta) - } - - // Update components if provided (and has a value) - if len(state.components) > 0 { - fields[dl.FieldComponents] = json.RawMessage(state.components) - } - - if state.upgradeDetails.isSet { - fields[dl.FieldUpgradeDetails] = state.upgradeDetails.value - } - if state.upgradeStartedAt.isSet { - fields[dl.FieldUpgradeStartedAt] = state.upgradeStartedAt.value - } - if state.upgradeStatus.isSet { - fields[dl.FieldUpgradeStatus] = state.upgradeStatus.value - } - - // If seqNo changed, set the field appropriately - if state.seqNo.IsSet() { - fields[dl.FieldActionSeqNo] = state.seqNo - } - return fields.Marshal() -} diff --git a/internal/pkg/checkin/bulk_test.go b/internal/pkg/checkin/bulk_test.go index a5c2eafe92..3f6540da80 100644 --- a/internal/pkg/checkin/bulk_test.go +++ b/internal/pkg/checkin/bulk_test.go @@ -71,14 +71,14 @@ type testcase struct { message string policyID string revisionIDX int64 - meta []byte - components []byte + meta json.RawMessage + components json.RawMessage seqno sqn.SeqNo ver string unhealthyReason *[]string } -func TestBulkSimple(t *testing.T) { +func TestBulkCheckin(t *testing.T) { const ver = "8.9.0" cases := []testcase{{ name: "Simple case", @@ -159,11 +159,11 @@ func TestBulkSimple(t *testing.T) { if c.policyID != "" { opts = append(opts, WithAgentPolicyID(c.policyID), WithPolicyRevisionIDX(c.revisionIDX)) } - if c.meta != nil { - opts = append(opts, WithMeta(c.meta)) + if c.meta != nil && len(c.meta) > 0 { + opts = append(opts, WithMeta(&c.meta)) } - if c.components != nil { - opts = append(opts, WithComponents(c.components)) + if c.components != nil && len(c.components) > 0 { + opts = append(opts, WithComponents(&c.components)) } if c.seqno != nil { opts = append(opts, WithSeqNo(c.seqno)) diff --git a/internal/pkg/server/fleet_integration_test.go b/internal/pkg/server/fleet_integration_test.go index e0e410dbcf..7bac90de52 100644 --- a/internal/pkg/server/fleet_integration_test.go +++ b/internal/pkg/server/fleet_integration_test.go @@ -72,7 +72,14 @@ const ( "version":"9.3.0" } } - } + }, + "components": [ + { + "id": "filestream-default", + "status": "Healthy", + "message": "Healthy" + } + ] }` ) From 5290aac688f8111ed1247b54390663bf54f41ebd Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Tue, 4 Nov 2025 20:24:40 -0500 Subject: [PATCH 06/11] Fix lint. --- internal/pkg/api/handleCheckin.go | 4 ++-- internal/pkg/checkin/bulk_test.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/pkg/api/handleCheckin.go b/internal/pkg/api/handleCheckin.go index d3fbd21506..1ee462c9b1 100644 --- a/internal/pkg/api/handleCheckin.go +++ b/internal/pkg/api/handleCheckin.go @@ -284,10 +284,10 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r checkin.WithUnhealthyReason(unhealthyReason), checkin.WithDeleteAudit(agent.AuditUnenrolledReason != "" || agent.UnenrolledAt != ""), ) - if rawMeta != nil && len(rawMeta) > 0 { + if len(rawMeta) > 0 { checkinOpts = append(checkinOpts, checkin.WithMeta(&rawMeta)) } - if rawComponents != nil && len(rawComponents) > 0 { + if len(rawComponents) > 0 { checkinOpts = append(checkinOpts, checkin.WithComponents(&rawComponents)) } diff --git a/internal/pkg/checkin/bulk_test.go b/internal/pkg/checkin/bulk_test.go index 3f6540da80..cb58206784 100644 --- a/internal/pkg/checkin/bulk_test.go +++ b/internal/pkg/checkin/bulk_test.go @@ -57,7 +57,7 @@ func matchOp(tb testing.TB, c testcase) func([]byte) bool { } if c.meta != nil { - assert.Equal(tb, json.RawMessage(c.meta), sub.Meta) + assert.Equal(tb, c.meta, sub.Meta) } assert.Equal(tb, c.status, sub.Status) return true @@ -159,10 +159,10 @@ func TestBulkCheckin(t *testing.T) { if c.policyID != "" { opts = append(opts, WithAgentPolicyID(c.policyID), WithPolicyRevisionIDX(c.revisionIDX)) } - if c.meta != nil && len(c.meta) > 0 { + if len(c.meta) > 0 { opts = append(opts, WithMeta(&c.meta)) } - if c.components != nil && len(c.components) > 0 { + if len(c.components) > 0 { opts = append(opts, WithComponents(&c.components)) } if c.seqno != nil { From e9d2ab2055d918a178b44726aa64ed5cffab439f Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Wed, 5 Nov 2025 12:21:44 -0500 Subject: [PATCH 07/11] Fix missed scenarios. --- internal/pkg/api/handleCheckin.go | 21 ++++++++++++------- internal/pkg/checkin/bulk.go | 8 ++++--- internal/pkg/server/fleet_integration_test.go | 2 +- testing/e2e/api_version/client_api_current.go | 14 ++++++------- 4 files changed, 26 insertions(+), 19 deletions(-) diff --git a/internal/pkg/api/handleCheckin.go b/internal/pkg/api/handleCheckin.go index 1ee462c9b1..0c6bd8c8dd 100644 --- a/internal/pkg/api/handleCheckin.go +++ b/internal/pkg/api/handleCheckin.go @@ -442,7 +442,7 @@ func (ct *CheckinT) verifyActionExists(vCtx context.Context, vSpan *apm.Span, ag func (ct *CheckinT) processUpgradeDetails(ctx context.Context, agent *model.Agent, details *UpgradeDetails, checkinOpts []checkin.Option) ([]checkin.Option, error) { if details == nil { if agent.UpgradeDetails == nil { - return nil, nil + return checkinOpts, nil } checkinOpts = append(checkinOpts, checkin.WithUpgradeDetails(nil), @@ -459,7 +459,7 @@ func (ct *CheckinT) processUpgradeDetails(ctx context.Context, agent *model.Agen return nil, err } if action == nil { - return nil, nil + return checkinOpts, nil } // link action with APM spans @@ -1194,12 +1194,17 @@ func (ct *CheckinT) processPolicyDetails(ctx context.Context, zlog zerolog.Logge // this can be different based on the situations to force a POLICY_CHANGE. effectivePolicyID := policyID effectiveRevIdx := revIdx - for _, output := range agent.Outputs { - if output.APIKey == "" { - // use revision_idx=0 if the agent has a single output where no API key is defined - // This will force the policy monitor to emit a new policy to regenerate API keys - effectiveRevIdx = 0 - break + if len(agent.Outputs) == 0 { + // no outputs yet, force policy regeneration + effectiveRevIdx = 0 + } else { + for _, output := range agent.Outputs { + if output.APIKey == "" { + // use revision_idx=0 if the agent has a single output where no API key is defined + // This will force the policy monitor to emit a new policy to regenerate API keys + effectiveRevIdx = 0 + break + } } } if effectivePolicyID != agent.PolicyID { diff --git a/internal/pkg/checkin/bulk.go b/internal/pkg/checkin/bulk.go index 4465fe5712..e13cdbc1aa 100644 --- a/internal/pkg/checkin/bulk.go +++ b/internal/pkg/checkin/bulk.go @@ -175,9 +175,11 @@ type checkinT struct { func (c *checkinT) toBody() ([]byte, error) { fields := bulk.UpdateFields{ - dl.FieldUpdatedAt: c.ts, - dl.FieldLastCheckin: c.ts, - dl.FieldUnhealthyReason: c.unhealthyReason, + dl.FieldUpdatedAt: c.ts, + dl.FieldLastCheckin: c.ts, + } + if c.unhealthyReason.isSet { + fields[dl.FieldUnhealthyReason] = c.unhealthyReason.value } if c.status.isSet { fields[dl.FieldLastCheckinStatus] = c.status.value diff --git a/internal/pkg/server/fleet_integration_test.go b/internal/pkg/server/fleet_integration_test.go index 7bac90de52..36f565bbcc 100644 --- a/internal/pkg/server/fleet_integration_test.go +++ b/internal/pkg/server/fleet_integration_test.go @@ -1685,7 +1685,7 @@ func Test_SmokeTest_AuditUnenroll(t *testing.T) { _, ok = obj["audit_unenrolled_reason"] _, ok2 := obj["unenrolled_at"] return !ok && !ok2 - }, time.Second*20, time.Second, "agent document should not have the audit_unenrolled_reason or unenrolled_at attributes. agent doc: %v", obj) + }, time.Second*10, time.Second, "agent document should not have the audit_unenrolled_reason or unenrolled_at attributes. agent doc: %v", obj) cancel() srv.waitExit() //nolint:errcheck // test case } diff --git a/testing/e2e/api_version/client_api_current.go b/testing/e2e/api_version/client_api_current.go index 0e7afeec89..5a4e85d1b2 100644 --- a/testing/e2e/api_version/client_api_current.go +++ b/testing/e2e/api_version/client_api_current.go @@ -465,7 +465,7 @@ func (tester *ClientAPITester) TestEnrollAuditUnenroll() { tester.Require().NoError(err) _, ok := obj.Source["audit_unenrolled_reason"] return !ok - }, time.Second*20, time.Second, "agent document in elasticsearch should not have audit_unenrolled_reason attribute") + }, time.Second*10, time.Second, "agent document in elasticsearch should not have audit_unenrolled_reason attribute") } // TestEnrollUpgradeAction_MetadataDownloadRate_String checks that download metadata rates can be sent as strings @@ -576,7 +576,7 @@ func (tester *ClientAPITester) TestCheckinWithPolicyIDRevision() { agent := tester.GetAgent(ctx, agentID) assert.Equal(c, policyID, agent.AgentPolicyID) assert.Equal(c, revIDX, int64(agent.Revision)) - }, time.Second*20, time.Second) + }, time.Second*10, time.Second) // Check in with revIDX that does not exist // POLICY_CHANGE should be returned @@ -612,7 +612,7 @@ func (tester *ClientAPITester) TestCheckinWithPolicyIDRevision() { agent := tester.GetAgent(ctx, agentID) assert.Equal(c, policyID, agent.AgentPolicyID) assert.Equal(c, newRevIDX, int64(agent.Revision)) - }, time.Second*20, time.Second) + }, time.Second*10, time.Second) // Update policy // Get the policy then "update" it without changing anything - revision ID should increment @@ -682,7 +682,7 @@ func (tester *ClientAPITester) TestCheckinWithPolicyIDRevision() { agent := tester.GetAgent(ctx, agentID) require.Equal(c, policyID, agent.AgentPolicyID) require.Equal(c, revIDX, int64(agent.Revision)) - }, time.Second*20, time.Second) + }, time.Second*10, time.Second) // Do a normal checkin to "reset" to latest revision_idx // no actions are returned @@ -704,7 +704,7 @@ func (tester *ClientAPITester) TestCheckinWithPolicyIDRevision() { agent := tester.GetAgent(ctx, agentID) require.Equal(c, policyID, agent.AgentPolicyID) require.Equal(c, revIDX, int64(agent.Revision)) - }, time.Second*20, time.Second) + }, time.Second*10, time.Second) // Test that if the agent is "restored" to an earlier revIDX a policy_change is sent prevRev := revIDX - 1 @@ -725,7 +725,7 @@ func (tester *ClientAPITester) TestCheckinWithPolicyIDRevision() { agent := tester.GetAgent(ctx, agentID) require.Equal(c, policyID, agent.AgentPolicyID) require.Equal(c, prevRev, int64(agent.Revision)) - }, time.Second*20, time.Second) + }, time.Second*10, time.Second) // agent is now recorded as on a previous revision - check to make sure a checkin without AgentPolicyId and revision result in a POLICY_CHANGE action tester.T().Logf("test checkin 7: agent %s with no policy or revision", agentID) @@ -765,7 +765,7 @@ func (tester *ClientAPITester) TestCheckinWithPolicyIDRevision() { agent := tester.GetAgent(ctx, agentID) assert.Equal(c, policyID, agent.AgentPolicyID) assert.Equal(c, revIDX, int64(agent.Revision)) - }, time.Second*20, time.Second) + }, time.Second*10, time.Second) // sanity check agent status in kibana tester.AgentIsOnline(ctx, agentID) From b253218976661b7fa232505d93d0fc2520c565df Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Wed, 5 Nov 2025 14:35:49 -0500 Subject: [PATCH 08/11] Add unit tests. --- internal/pkg/api/handleCheckin_test.go | 68 ++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/internal/pkg/api/handleCheckin_test.go b/internal/pkg/api/handleCheckin_test.go index a9f9d2aaaf..ec25096bca 100644 --- a/internal/pkg/api/handleCheckin_test.go +++ b/internal/pkg/api/handleCheckin_test.go @@ -1325,6 +1325,11 @@ func TestProcessPolicyDetails(t *testing.T) { ESDocument: esd, PolicyID: policyID, PolicyRevisionIdx: 1, + Outputs: map[string]*model.PolicyOutput{ + "default": &model.PolicyOutput{ + APIKey: "123", + }, + }, }, req: &CheckinRequest{}, policyID: policyID, @@ -1341,6 +1346,11 @@ func TestProcessPolicyDetails(t *testing.T) { PolicyID: "new-policy-id", AgentPolicyID: policyID, PolicyRevisionIdx: 2, + Outputs: map[string]*model.PolicyOutput{ + "default": &model.PolicyOutput{ + APIKey: "123", + }, + }, }, req: &CheckinRequest{ AgentPolicyId: &policyID, @@ -1353,6 +1363,44 @@ func TestProcessPolicyDetails(t *testing.T) { policyDetailsMockCheck(t, mBulk, policyID, revIDX2) return mBulk }, + }, { + name: "no outputs", + agent: &model.Agent{ + ESDocument: esd, + PolicyID: policyID, + AgentPolicyID: policyID, + PolicyRevisionIdx: 2, + }, + req: &CheckinRequest{}, + policyID: policyID, + revIdx: 0, + bulk: func() *ftesting.MockBulk { + mBulk := ftesting.NewMockBulk() + noPolicyDetailsMockCheck(t, mBulk) + return mBulk + }, + }, { + name: "missing output APIKey", + agent: &model.Agent{ + ESDocument: esd, + PolicyID: policyID, + AgentPolicyID: policyID, + PolicyRevisionIdx: 2, + Outputs: map[string]*model.PolicyOutput{ + "default": &model.PolicyOutput{ + APIKey: "123", + }, + "remote": &model.PolicyOutput{}, + }, + }, + req: &CheckinRequest{}, + policyID: policyID, + revIdx: 0, + bulk: func() *ftesting.MockBulk { + mBulk := ftesting.NewMockBulk() + noPolicyDetailsMockCheck(t, mBulk) + return mBulk + }, }, { name: "revision updated", agent: &model.Agent{ @@ -1360,6 +1408,11 @@ func TestProcessPolicyDetails(t *testing.T) { PolicyID: policyID, AgentPolicyID: policyID, PolicyRevisionIdx: 1, + Outputs: map[string]*model.PolicyOutput{ + "default": &model.PolicyOutput{ + APIKey: "123", + }, + }, }, req: &CheckinRequest{ AgentPolicyId: &policyID, @@ -1378,6 +1431,11 @@ func TestProcessPolicyDetails(t *testing.T) { ESDocument: esd, PolicyID: policyID, PolicyRevisionIdx: 2, + Outputs: map[string]*model.PolicyOutput{ + "default": &model.PolicyOutput{ + APIKey: "123", + }, + }, }, req: &CheckinRequest{ AgentPolicyId: &policyID, @@ -1397,6 +1455,11 @@ func TestProcessPolicyDetails(t *testing.T) { AgentPolicyID: policyID, PolicyID: policyID, PolicyRevisionIdx: revIDX2, + Outputs: map[string]*model.PolicyOutput{ + "default": &model.PolicyOutput{ + APIKey: "123", + }, + }, }, req: &CheckinRequest{ AgentPolicyId: &policyID, @@ -1416,6 +1479,11 @@ func TestProcessPolicyDetails(t *testing.T) { AgentPolicyID: policyID, PolicyID: policyID, PolicyRevisionIdx: revIDX2, + Outputs: map[string]*model.PolicyOutput{ + "default": &model.PolicyOutput{ + APIKey: "123", + }, + }, }, req: &CheckinRequest{ AgentPolicyId: &policyID, From 0d9acbb2f6c78c34d6a566185cc6fe1a7a2f4a5d Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 6 Nov 2025 09:51:46 -0500 Subject: [PATCH 09/11] Fix e2e test. --- testing/e2e/api_version/client_api_current.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/testing/e2e/api_version/client_api_current.go b/testing/e2e/api_version/client_api_current.go index 5a4e85d1b2..459cc075d1 100644 --- a/testing/e2e/api_version/client_api_current.go +++ b/testing/e2e/api_version/client_api_current.go @@ -675,15 +675,21 @@ func (tester *ClientAPITester) TestCheckinWithPolicyIDRevision() { } } tester.Require().True(found, "unable to find POLICY_CHANGE action in 4th checkin response") - revIDX = int64(policyChange.Policy.Revision) - tester.Require().Equal(int64(iRev), revIDX, "Expected POLICY_CHANGE action to be for updated policy revision") + tester.Require().Equal(int64(iRev), int64(policyChange.Policy.Revision), "Expected POLICY_CHANGE action to be for updated policy revision") + // Document should reflect the current state of the Elastic Agent, which is always the revision_idx it used + // to check-in. Only an ACK or the next check-in that states the next revision would it be updated to the + // revision_idx that comes from the policy. tester.Require().EventuallyWithT(func(c *assert.CollectT) { agent := tester.GetAgent(ctx, agentID) require.Equal(c, policyID, agent.AgentPolicyID) require.Equal(c, revIDX, int64(agent.Revision)) }, time.Second*10, time.Second) + // Bump the revision_idx to the same as the policy to signal to Fleet that the Elastic Agent is now + // running the latest revision that it was sent. + revIDX = int64(policyChange.Policy.Revision) + // Do a normal checkin to "reset" to latest revision_idx // no actions are returned // Manage any API keys if present From 6401bb42e0009fcad2e074db2a4208ba23ba636a Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Fri, 7 Nov 2025 10:01:29 -0500 Subject: [PATCH 10/11] Update changelog entry. --- ...-when-multiple-fleet-servers-are-used.yaml | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 changelog/fragments/1762527662-fix-data-race-issue-during-checkin-when-multiple-fleet-servers-are-used.yaml diff --git a/changelog/fragments/1762527662-fix-data-race-issue-during-checkin-when-multiple-fleet-servers-are-used.yaml b/changelog/fragments/1762527662-fix-data-race-issue-during-checkin-when-multiple-fleet-servers-are-used.yaml new file mode 100644 index 0000000000..827fd39e6e --- /dev/null +++ b/changelog/fragments/1762527662-fix-data-race-issue-during-checkin-when-multiple-fleet-servers-are-used.yaml @@ -0,0 +1,45 @@ +# REQUIRED +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# REQUIRED for all kinds +# Change summary; a 80ish characters long description of the change. +summary: fix data race issue during checkin when multiple fleet-servers are used + +# REQUIRED for breaking-change, deprecation, known-issue +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# description: + +# REQUIRED for breaking-change, deprecation, known-issue +# impact: + +# REQUIRED for breaking-change, deprecation, known-issue +# action: + +# REQUIRED for all kinds +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: fleet-server + +# AUTOMATED +# OPTIONAL to manually add other PR URLs +# PR URL: A link the PR that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +# pr: https://github.com/owner/repo/1234 + +# AUTOMATED +# OPTIONAL to manually add other issue URLs +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +# issue: https://github.com/owner/repo/1234 From bbd2ac20881ff1d822d8003e80f785282dca6cc8 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Tue, 18 Nov 2025 14:24:07 -0500 Subject: [PATCH 11/11] Fix UpgradedAt and use a MUpdate for painless audit. --- internal/pkg/api/handleCheckin.go | 2 + internal/pkg/api/handleCheckin_test.go | 26 +++++++--- internal/pkg/checkin/bulk.go | 66 ++++++++++++++++++-------- internal/pkg/checkin/bulk_test.go | 46 +++++++++++++++++- 4 files changed, 112 insertions(+), 28 deletions(-) diff --git a/internal/pkg/api/handleCheckin.go b/internal/pkg/api/handleCheckin.go index 92fc74c84b..6899a9e6ed 100644 --- a/internal/pkg/api/handleCheckin.go +++ b/internal/pkg/api/handleCheckin.go @@ -444,10 +444,12 @@ func (ct *CheckinT) processUpgradeDetails(ctx context.Context, agent *model.Agen if agent.UpgradeDetails == nil { return checkinOpts, nil } + upgradedAt := time.Now().UTC().Format(time.RFC3339) checkinOpts = append(checkinOpts, checkin.WithUpgradeDetails(nil), checkin.WithUpgradeStartedAt(nil), checkin.WithUpgradeStatus(nil), + checkin.WithUpgradedAt(&upgradedAt), ) return checkinOpts, nil } diff --git a/internal/pkg/api/handleCheckin_test.go b/internal/pkg/api/handleCheckin_test.go index 2a158719f9..758c9c6fac 100644 --- a/internal/pkg/api/handleCheckin_test.go +++ b/internal/pkg/api/handleCheckin_test.go @@ -409,7 +409,7 @@ func TestProcessUpgradeDetails(t *testing.T) { t.Logf("bulk match unmarshal error: %v", err) return false } - return doc.Doc[dl.FieldUpgradeDetails] == nil && doc.Doc[dl.FieldUpgradeStartedAt] == nil && doc.Doc[dl.FieldUpgradeStatus] == nil && doc.Doc[dl.FieldUpgradedAt] != "" + return doc.Doc[dl.FieldUpgradeDetails] == nil && doc.Doc[dl.FieldUpgradeStartedAt] == nil && doc.Doc[dl.FieldUpgradeStatus] == nil && getFieldStringValue(doc.Doc, dl.FieldUpgradedAt) != "" }), mock.Anything, mock.Anything).Return(nil) return mBulk }, @@ -753,7 +753,8 @@ func TestProcessUpgradeDetails(t *testing.T) { } _, upgradeDetails := doc.Doc[dl.FieldUpgradeDetails] upgradeAttempts, ok := doc.Doc[dl.FieldUpgradeAttempts] - return upgradeDetails && ok && upgradeAttempts == nil && doc.Doc[dl.FieldUpgradedAt] != "" + _, noUpgradedAt := doc.Doc[dl.FieldUpgradedAt] + return upgradeDetails && ok && upgradeAttempts == nil && !noUpgradedAt }), mock.Anything, mock.Anything).Return(nil) return mBulk }, @@ -809,7 +810,8 @@ func noUpgradeDetailsMockCheck(t *testing.T, mBulk *ftesting.MockBulk) { _, noUpgradeDetails := doc.Doc[dl.FieldUpgradeDetails] _, noUpgradeStartedAt := doc.Doc[dl.FieldUpgradeStartedAt] _, noUpgradeStatus := doc.Doc[dl.FieldUpgradeStatus] - return !noUpgradeDetails && !noUpgradeStartedAt && !noUpgradeStatus && doc.Doc[dl.FieldUpgradedAt] != "" + _, noUpgradedAt := doc.Doc[dl.FieldUpgradedAt] + return !noUpgradeDetails && !noUpgradeStartedAt && !noUpgradeStatus && !noUpgradedAt }), mock.Anything, mock.Anything).Return(nil) } @@ -823,7 +825,8 @@ func upgradeDetailsMockCheck(t *testing.T, mBulk *ftesting.MockBulk) { return false } _, upgradeDetails := doc.Doc[dl.FieldUpgradeDetails] - return upgradeDetails && doc.Doc[dl.FieldUpgradedAt] != "" + _, noUpgradedAt := doc.Doc[dl.FieldUpgradedAt] + return upgradeDetails && !noUpgradedAt }), mock.Anything, mock.Anything).Return(nil) } @@ -1574,7 +1577,7 @@ func noPolicyDetailsMockCheck(t *testing.T, mBulk *ftesting.MockBulk) { } _, noAgentPolicyID := doc.Doc[dl.FieldAgentPolicyID] _, noPolicyRevisionIdx := doc.Doc[dl.FieldPolicyRevisionIdx] - return !noAgentPolicyID && !noPolicyRevisionIdx && doc.Doc[dl.FieldUpgradedAt] != "" + return !noAgentPolicyID && !noPolicyRevisionIdx }), mock.Anything, mock.Anything).Return(nil) } @@ -1599,6 +1602,17 @@ func policyDetailsMockCheck(t *testing.T, mBulk *ftesting.MockBulk, policyID str if !ok { return false } - return oPolicyID == policyID && int64(oRevIdxF) == revIdx && doc.Doc[dl.FieldUpgradedAt] != "" + return oPolicyID == policyID && int64(oRevIdxF) == revIdx }), mock.Anything, mock.Anything).Return(nil) } + +func getFieldStringValue(doc map[string]interface{}, key string) string { + raw, ok := doc[key] + if ok { + s, ok := raw.(string) + if ok { + return s + } + } + return "" +} diff --git a/internal/pkg/checkin/bulk.go b/internal/pkg/checkin/bulk.go index e13cdbc1aa..cfef6f3f29 100644 --- a/internal/pkg/checkin/bulk.go +++ b/internal/pkg/checkin/bulk.go @@ -145,6 +145,13 @@ func WithUpgradeAttempts(attempts *[]string) Option { } } +func WithUpgradedAt(upgradedAt *string) Option { + return func(state *checkinT) { + state.upgradedAt.isSet = true + state.upgradedAt.value = upgradedAt + } +} + type setT[T any] struct { isSet bool value T @@ -169,6 +176,7 @@ type checkinT struct { upgradeStartedAt setT[*string] upgradeStatus setT[*string] upgradeAttempts setT[*[]string] + upgradedAt setT[*string] deleteAudit bool } @@ -226,6 +234,9 @@ func (c *checkinT) toBody() ([]byte, error) { if c.upgradeAttempts.isSet { fields[dl.FieldUpgradeAttempts] = c.upgradeAttempts.value } + if c.upgradedAt.isSet { + fields[dl.FieldUpgradedAt] = c.upgradedAt.value + } // If seqNo changed, set the field appropriately if c.seqNo.IsSet() { @@ -301,31 +312,44 @@ func (bc *Bulk) CheckIn(ctx context.Context, id string, opts ...Option) error { if checkin.seqNo.IsSet() { bulkOpts = append(bulkOpts, bulk.WithRefresh()) } - err = bc.bulker.Update(ctx, dl.FleetAgents, id, body, bulkOpts...) - if err != nil { - return fmt.Errorf("failed to update document: %w", err) - } - - // deleteAudit performs a second request (being that this should not happen very often - // it's safer to only use this script when it is needed). - if checkin.deleteAudit { - action := &estypes.UpdateAction{ - Script: &estypes.Script{ - Lang: &scriptlanguage.Painless, - Source: &deleteAuditAttributesScript, - Options: map[string]string{}, - }, - } - body, err = json.Marshal(&action) - if err != nil { - return fmt.Errorf("could not marshall script action: %w", err) - } - err = bc.bulker.Update(ctx, dl.FleetAgents, id, body, bulk.WithRetryOnConflict(3)) + + // no deleteAudit so a simple update will work + if !checkin.deleteAudit { + err = bc.bulker.Update(ctx, dl.FleetAgents, id, body, bulkOpts...) if err != nil { - return fmt.Errorf("failed to remove audit fields with script update: %w", err) + return fmt.Errorf("failed to update document: %w", err) } + return nil } + // deleteAudit requires the usage of MUpdate to allow updating the body and then removing the needed fields. + action := &estypes.UpdateAction{ + Script: &estypes.Script{ + Lang: &scriptlanguage.Painless, + Source: &deleteAuditAttributesScript, + Options: map[string]string{}, + }, + } + actionBody, err := json.Marshal(&action) + if err != nil { + return fmt.Errorf("could not marshall script action: %w", err) + } + updates := []bulk.MultiOp{ + { + ID: id, + Index: dl.FleetAgents, + Body: body, + }, + { + ID: id, + Index: dl.FleetAgents, + Body: actionBody, + }, + } + _, err = bc.bulker.MUpdate(ctx, updates, bulkOpts...) + if err != nil { + return fmt.Errorf("failed update document and remove audit fields: %w", err) + } return nil } diff --git a/internal/pkg/checkin/bulk_test.go b/internal/pkg/checkin/bulk_test.go index cb58206784..e6415b1c5e 100644 --- a/internal/pkg/checkin/bulk_test.go +++ b/internal/pkg/checkin/bulk_test.go @@ -14,6 +14,8 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/sqn" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" + estypes "github.com/elastic/go-elasticsearch/v8/typedapi/types" + "github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/scriptlanguage" "github.com/google/go-cmp/cmp" "github.com/rs/xid" @@ -64,6 +66,31 @@ func matchOp(tb testing.TB, c testcase) func([]byte) bool { } } +// matchMOp is used with mock.MatchedBy to match and validate a multi update operation +func matchMOp(tb testing.TB, c testcase) func([]bulk.MultiOp) bool { + return func(updates []bulk.MultiOp) bool { + require.Len(tb, updates, 2) + + // first is the same body as matchOp checks + assert.Equal(tb, c.id, updates[0].ID) + assert.Equal(tb, dl.FleetAgents, updates[0].Index) + assert.True(tb, matchOp(tb, c)(updates[0].Body)) + + // second is the painless script to remove the audit fields + assert.Equal(tb, c.id, updates[1].ID) + assert.Equal(tb, dl.FleetAgents, updates[1].Index) + + var action estypes.UpdateAction + err := json.Unmarshal(updates[1].Body, &action) + require.NoError(tb, err, "second update body not an update action") + require.NotNil(tb, action.Script) + assert.Equal(tb, &scriptlanguage.Painless, action.Script.Lang) + assert.Equal(tb, &deleteAuditAttributesScript, action.Script.Source) + + return true + } +} + type testcase struct { name string id string @@ -76,6 +103,7 @@ type testcase struct { seqno sqn.SeqNo ver string unhealthyReason *[]string + deleteAudit bool } func TestBulkCheckin(t *testing.T) { @@ -113,6 +141,15 @@ func TestBulkCheckin(t *testing.T) { meta: []byte(`{"hey":"now","brown":"cow"}`), components: []byte(`[{"id":"winlog-default","type":"winlog"}]`), ver: ver, + }, { + name: "Multi field case w/ delete audit", + id: "multiFieldId", + status: "online", + message: "message", + meta: []byte(`{"hey":"now","brown":"cow"}`), + components: []byte(`[{"id":"winlog-default","type":"winlog"}]`), + ver: ver, + deleteAudit: true, }, { name: "Multi field nested case", id: "multiFieldNestedId", @@ -152,7 +189,11 @@ func TestBulkCheckin(t *testing.T) { t.Run(c.name, func(t *testing.T) { ctx := testlog.SetLogger(t).WithContext(t.Context()) mockBulk := ftesting.NewMockBulk() - mockBulk.On("Update", mock.Anything, dl.FleetAgents, c.id, mock.MatchedBy(matchOp(t, c)), mock.Anything).Return(nil).Once() + if c.deleteAudit { + mockBulk.On("MUpdate", mock.Anything, mock.MatchedBy(matchMOp(t, c)), mock.Anything).Return([]bulk.BulkIndexerResponseItem{}, nil).Once() + } else { + mockBulk.On("Update", mock.Anything, dl.FleetAgents, c.id, mock.MatchedBy(matchOp(t, c)), mock.Anything).Return(nil).Once() + } bc := NewBulk(mockBulk) opts := []Option{WithStatus(c.status), WithMessage(c.message)} @@ -174,6 +215,9 @@ func TestBulkCheckin(t *testing.T) { if c.unhealthyReason != nil { opts = append(opts, WithUnhealthyReason(c.unhealthyReason)) } + if c.deleteAudit { + opts = append(opts, WithDeleteAudit(true)) + } err := bc.CheckIn(ctx, c.id, opts...) require.NoError(t, err)