diff --git a/changelog/fragments/1762527662-fix-data-race-issue-during-checkin-when-multiple-fleet-servers-are-used.yaml b/changelog/fragments/1762527662-fix-data-race-issue-during-checkin-when-multiple-fleet-servers-are-used.yaml new file mode 100644 index 0000000000..827fd39e6e --- /dev/null +++ b/changelog/fragments/1762527662-fix-data-race-issue-during-checkin-when-multiple-fleet-servers-are-used.yaml @@ -0,0 +1,45 @@ +# REQUIRED +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# REQUIRED for all kinds +# Change summary; a 80ish characters long description of the change. +summary: fix data race issue during checkin when multiple fleet-servers are used + +# REQUIRED for breaking-change, deprecation, known-issue +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# description: + +# REQUIRED for breaking-change, deprecation, known-issue +# impact: + +# REQUIRED for breaking-change, deprecation, known-issue +# action: + +# REQUIRED for all kinds +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: fleet-server + +# AUTOMATED +# OPTIONAL to manually add other PR URLs +# PR URL: A link the PR that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +# pr: https://github.com/owner/repo/1234 + +# AUTOMATED +# OPTIONAL to manually add other issue URLs +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +# issue: https://github.com/owner/repo/1234 diff --git a/internal/pkg/api/handleCheckin.go b/internal/pkg/api/handleCheckin.go index 1aa720f8a7..d51280a905 100644 --- a/internal/pkg/api/handleCheckin.go +++ b/internal/pkg/api/handleCheckin.go @@ -169,8 +169,8 @@ func invalidateAPIKeysOfInactiveAgent(ctx context.Context, zlog zerolog.Logger, type validatedCheckin struct { req *CheckinRequest dur time.Duration - rawMeta []byte - rawComp []byte + rawMeta json.RawMessage + rawComp json.RawMessage seqno sqn.SeqNo unhealthyReason *[]string } @@ -275,29 +275,37 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r seqno := validated.seqno unhealthyReason := validated.unhealthyReason - // Handle upgrade details for agents using the new 8.11 upgrade details field of the checkin. - // Older agents will communicate any issues with upgrades via the Ack endpoint. - if err := ct.processUpgradeDetails(r.Context(), agent, req.UpgradeDetails); err != nil { - return fmt.Errorf("failed to update upgrade_details: %w", err) - } - - initialOpts := []checkin.Option{ + checkinOpts := make([]checkin.Option, 0, 14) // maximum of 14 (pre-allocate to reduce allocs) + checkinOpts = append(checkinOpts, checkin.WithStatus(string(req.Status)), checkin.WithMessage(req.Message), - checkin.WithMeta(rawMeta), - checkin.WithComponents(rawComponents), checkin.WithSeqNo(seqno), checkin.WithVer(ver), checkin.WithUnhealthyReason(unhealthyReason), checkin.WithDeleteAudit(agent.AuditUnenrolledReason != "" || agent.UnenrolledAt != ""), + ) + if len(rawMeta) > 0 { + checkinOpts = append(checkinOpts, checkin.WithMeta(&rawMeta)) + } + if len(rawComponents) > 0 { + checkinOpts = append(checkinOpts, checkin.WithComponents(&rawComponents)) } - revID, opts, err := ct.processPolicyDetails(r.Context(), zlog, agent, req) + // Handle upgrade details for agents using the new 8.11 upgrade details field of the checkin. + // Older agents will communicate any issues with upgrades via the Ack endpoint. + // checkinOpts is passed in to reducing allocations + checkinOpts, err = ct.processUpgradeDetails(r.Context(), agent, req.UpgradeDetails, checkinOpts) if err != nil { - return fmt.Errorf("failed to update policy details: %w", err) + return fmt.Errorf("failed to update upgrade_details: %w", err) } - if len(opts) > 0 { - initialOpts = append(initialOpts, opts...) + + // Handle the policy ID and the revision idx upon check-in. We want to always ensure that + // the agent document represents the current state of this check-in. + var effectivePolicyID string + var effectiveRevIdx int64 + checkinOpts, effectivePolicyID, effectiveRevIdx, err = ct.processPolicyDetails(r.Context(), zlog, agent, req, checkinOpts) + if err != nil { + return fmt.Errorf("failed to process policy details: %w", err) } // Subscribe to actions dispatcher @@ -305,31 +313,18 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r defer ct.ad.Unsubscribe(zlog, aSub) actCh := aSub.Ch() - for _, output := range agent.Outputs { - if output.APIKey == "" { - // use revision_idx=0 if the agent has a single output where no API key is defined - // This will force the policy monitor to emit a new policy to regerate API keys - revID = 0 - break - } - } - // Subscribe to policy manager for changes on PolicyId > policyRev - sub, err := ct.pm.Subscribe(agent.Id, agent.PolicyID, revID) + sub, err := ct.pm.Subscribe(agent.Id, effectivePolicyID, effectiveRevIdx) if err != nil { return fmt.Errorf("subscribe policy monitor: %w", err) } defer func() { err := ct.pm.Unsubscribe(sub) if err != nil { - zlog.Error().Err(err).Str(ecs.PolicyID, agent.PolicyID).Msg("unable to unsubscribe from policy") + zlog.Error().Err(err).Str(ecs.PolicyID, effectivePolicyID).Msg("unable to unsubscribe from policy") } }() - // Update check-in timestamp on timeout - tick := time.NewTicker(ct.cfg.Timeouts.CheckinTimestamp) - defer tick.Stop() - setupDuration := time.Since(start) pollDuration, jitter := calcPollDuration(zlog, pollDuration, setupDuration, ct.cfg.Timeouts.CheckinJitter) @@ -345,14 +340,18 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r longPoll := time.NewTicker(pollDuration) defer longPoll.Stop() - // Initial update on checkin, and any user fields that might have changed - // Run a script to remove audit_unenrolled_* and unenrolled_at attributes if one is set on checkin. - // 8.16.x releases would incorrectly set unenrolled_at - err = ct.bc.CheckIn(agent.Id, initialOpts...) + // Initial update on checkin updates any user fields that might have changed. + err = ct.bc.CheckIn(r.Context(), agent.Id, checkinOpts...) if err != nil { zlog.Error().Err(err).Str(ecs.AgentID, agent.Id).Msg("checkin failed") + return err } + // Register the agent with the checkin to keep the `updated_at` field updated while connected. + // The defer will ensure that on return or panic that it is removed. + ct.bc.Add(agent.Id) + defer ct.bc.Remove(agent.Id) + // Initial fetch for pending actions var ( actions []Action @@ -402,11 +401,6 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r case <-longPoll.C: zlog.Trace().Msg("fire long poll") break LOOP - case <-tick.C: - err := ct.bc.CheckIn(agent.Id, checkin.WithStatus(string(req.Status)), checkin.WithMessage(req.Message), checkin.WithComponents(rawComponents), checkin.WithVer(ver), checkin.WithUnhealthyReason(unhealthyReason)) // FIXME If we change to properly handle empty strings we could stop passing optional args here. - if err != nil { - zlog.Error().Err(err).Str(ecs.AgentID, agent.Id).Msg("checkin failed") - } } } } @@ -445,23 +439,29 @@ func (ct *CheckinT) verifyActionExists(vCtx context.Context, vSpan *apm.Span, ag // if the agent doc and checkin details are both nil the method is a nop // if the checkin upgrade_details is nil but there was a previous value in the agent doc, fleet-server treats it as a successful upgrade // otherwise the details are validated; action_id is checked and upgrade_details.metadata is validated based on upgrade_details.state and the agent doc is updated. -func (ct *CheckinT) processUpgradeDetails(ctx context.Context, agent *model.Agent, details *UpgradeDetails) error { +func (ct *CheckinT) processUpgradeDetails(ctx context.Context, agent *model.Agent, details *UpgradeDetails, checkinOpts []checkin.Option) ([]checkin.Option, error) { if details == nil { - err := ct.markUpgradeComplete(ctx, agent) - if err != nil { - return err + if agent.UpgradeDetails == nil { + return checkinOpts, nil } - return nil + upgradedAt := time.Now().UTC().Format(time.RFC3339) + checkinOpts = append(checkinOpts, + checkin.WithUpgradeDetails(nil), + checkin.WithUpgradeStartedAt(nil), + checkin.WithUpgradeStatus(nil), + checkin.WithUpgradedAt(&upgradedAt), + ) + return checkinOpts, nil } - // update docs with in progress details + // update docs with in progress details vSpan, vCtx := apm.StartSpan(ctx, "Check update action", "validate") action, err := ct.verifyActionExists(ctx, vSpan, agent, details) if err != nil { - return err + return nil, err } if action == nil { - return nil + return checkinOpts, nil } // link action with APM spans @@ -495,84 +495,60 @@ func (ct *CheckinT) processUpgradeDetails(ctx context.Context, agent *model.Agen upgradeDetails, err := details.Metadata.AsUpgradeMetadataDownloading() if err != nil { vSpan.End() - return fmt.Errorf("%w %s: %w", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGDOWNLOADING, err) + return nil, fmt.Errorf("%w %s: %w", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGDOWNLOADING, err) } if err := details.Metadata.FromUpgradeMetadataDownloading(upgradeDetails); err != nil { vSpan.End() - return fmt.Errorf("%w %s: unable to repack metadata: %w", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGDOWNLOADING, err) + return nil, fmt.Errorf("%w %s: unable to repack metadata: %w", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGDOWNLOADING, err) } case UpgradeDetailsStateUPGFAILED: if details.Metadata == nil { vSpan.End() - return fmt.Errorf("%w: metadata missing", ErrInvalidUpgradeMetadata) + return nil, fmt.Errorf("%w: metadata missing", ErrInvalidUpgradeMetadata) } meta, err := details.Metadata.AsUpgradeMetadataFailed() if err != nil { vSpan.End() - return fmt.Errorf("%w %s: %w", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGFAILED, err) + return nil, fmt.Errorf("%w %s: %w", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGFAILED, err) } if meta.ErrorMsg == "" { vSpan.End() - return fmt.Errorf("%w: %s metadata contains empty error_msg attribute", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGFAILED) + return nil, fmt.Errorf("%w: %s metadata contains empty error_msg attribute", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGFAILED) } // Repack metadata in failed case as the agent may send UPG_DOWNLOADING attributes. if err = details.Metadata.FromUpgradeMetadataFailed(meta); err != nil { vSpan.End() - return fmt.Errorf("%w %s: unable to repack metadata: %w", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGFAILED, err) + return nil, fmt.Errorf("%w %s: unable to repack metadata: %w", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGFAILED, err) } case UpgradeDetailsStateUPGSCHEDULED: if details.Metadata == nil { vSpan.End() - return fmt.Errorf("%w: metadata missing", ErrInvalidUpgradeMetadata) + return nil, fmt.Errorf("%w: metadata missing", ErrInvalidUpgradeMetadata) } meta, err := details.Metadata.AsUpgradeMetadataScheduled() if err != nil { vSpan.End() - return fmt.Errorf("%w %s: %w", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGSCHEDULED, err) + return nil, fmt.Errorf("%w %s: %w", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGSCHEDULED, err) } if meta.ScheduledAt.IsZero() { vSpan.End() - return fmt.Errorf("%w: %s metadata contains empty scheduled_at attribute", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGSCHEDULED) + return nil, fmt.Errorf("%w: %s metadata contains empty scheduled_at attribute", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGSCHEDULED) } default: } vSpan.End() - doc := bulk.UpdateFields{ - dl.FieldUpgradeDetails: details, - } - if agent.UpgradeAttempts != nil && details.State == UpgradeDetailsStateUPGWATCHING { - doc[dl.FieldUpgradeAttempts] = nil - } - - body, err := doc.Marshal() + detailsBody, err := json.Marshal(details) if err != nil { - return err - } - return ct.bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)) -} - -func (ct *CheckinT) markUpgradeComplete(ctx context.Context, agent *model.Agent) error { - // nop if there are no checkin details, and the agent has no details - if agent.UpgradeDetails == nil { - return nil + return nil, fmt.Errorf("failed to marshal upgrade details: %w", err) } - span, ctx := apm.StartSpan(ctx, "Mark update complete", "update") - span.Context.SetLabel("agent_id", agent.Agent.ID) - defer span.End() - // if the checkin had no details, but agent has details treat like a successful upgrade - doc := bulk.UpdateFields{ - dl.FieldUpgradeDetails: nil, - dl.FieldUpgradeStartedAt: nil, - dl.FieldUpgradeStatus: nil, - dl.FieldUpgradedAt: time.Now().UTC().Format(time.RFC3339), - } - body, err := doc.Marshal() - if err != nil { - return err + detailsBodyJSON := json.RawMessage(detailsBody) + checkinOpts = append(checkinOpts, checkin.WithUpgradeDetails(&detailsBodyJSON)) + if agent.UpgradeAttempts != nil && details.State == UpgradeDetailsStateUPGWATCHING { + checkinOpts = append(checkinOpts, checkin.WithUpgradeAttempts(nil)) } - return ct.bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)) + return checkinOpts, nil } func (ct *CheckinT) writeResponse(zlog zerolog.Logger, w http.ResponseWriter, r *http.Request, agent *model.Agent, resp CheckinResponse) error { @@ -1040,6 +1016,12 @@ func parseMeta(zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest) ([] return nil, nil } + // Check for empty string - not valid metadata + if str, ok := reqLocalMeta.(string); ok && str == "" { + zlog.Warn().Msg("local metadata empty; won't update metadata") + return nil, nil + } + // Deserialize the agent's metadata copy. If it fails, it's ignored as it will just // be replaced with the correct contents from the clients checkin. var agentLocalMeta interface{} @@ -1047,24 +1029,21 @@ func parseMeta(zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest) ([] zlog.Warn().Err(err).Msg("local_metadata in document invalid; ignoring it") } - var outMeta []byte - - // Compare the deserialized meta structures and return the bytes to update if different - if !reflect.DeepEqual(reqLocalMeta, agentLocalMeta) { - - zlog.Trace(). - RawJSON("oldLocalMeta", agent.LocalMetadata). - RawJSON("newLocalMeta", req.LocalMetadata). - Msg("local metadata not equal") + // Compare the deserialized meta structures, already the same means no update needs to occur. + if reflect.DeepEqual(reqLocalMeta, agentLocalMeta) { + return nil, nil + } - zlog.Info(). - RawJSON("req.LocalMeta", req.LocalMetadata). - Msg("applying new local metadata") + zlog.Trace(). + RawJSON("oldLocalMeta", agent.LocalMetadata). + RawJSON("newLocalMeta", req.LocalMetadata). + Msg("local metadata not equal") - outMeta = req.LocalMetadata - } + zlog.Info(). + RawJSON("req.LocalMeta", req.LocalMetadata). + Msg("applying new local metadata") - return outMeta, nil + return req.LocalMetadata, nil } func parseComponents(zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest) ([]byte, *[]string, error) { @@ -1199,54 +1178,69 @@ func calcPollDuration(zlog zerolog.Logger, pollDuration, setupDuration, jitterDu return pollDuration, jitter } -// processPolicyDetails handles the agent_policy_id and revision_idx included in the checkin request. -// The API keys will be managed if the agent reports a new policy id from its last checkin, or if the revision is different than what the last checkin reported. -// It returns the revision idx that should be used when subscribing for new POLICY_CHANGE actons and optional args to use when doing the non-tick checkin. -func (ct *CheckinT) processPolicyDetails(ctx context.Context, zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest) (int64, []checkin.Option, error) { - // no details specified or attributes are ignored by config - if ct.cfg.Features.IgnoreCheckinPolicyID || req == nil || req.PolicyRevisionIdx == nil || req.AgentPolicyId == nil { - return agent.PolicyRevisionIdx, nil, nil - } - policyID := *req.AgentPolicyId - revisionIDX := *req.PolicyRevisionIdx - - span, ctx := apm.StartSpan(ctx, "Process policy details", "process") - span.Context.SetLabel("agent_id", agent.Agent.ID) - span.Context.SetLabel(dl.FieldAgentPolicyID, policyID) - span.Context.SetLabel(dl.FieldPolicyRevisionIdx, revisionIDX) - defer span.End() - - // update agent doc if policy id or revision idx does not match - var opts []checkin.Option - if policyID != agent.AgentPolicyID || revisionIDX != agent.PolicyRevisionIdx { - opts = []checkin.Option{ - checkin.WithAgentPolicyID(policyID), - checkin.WithPolicyRevisionIDX(revisionIDX), +func (ct *CheckinT) processPolicyDetails(ctx context.Context, zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest, checkinOpts []checkin.Option) ([]checkin.Option, string, int64, error) { + // Handle the policy ID and the revision idx upon check-in. We want to always ensure that + // the agent document represents the current state of this check-in. + policyID := agent.PolicyID + revIdx := agent.PolicyRevisionIdx + if !ct.cfg.Features.IgnoreCheckinPolicyID && req.AgentPolicyId != nil && *req.AgentPolicyId != "" && req.PolicyRevisionIdx != nil && *req.PolicyRevisionIdx >= 0 { + policyID = *req.AgentPolicyId + revIdx = *req.PolicyRevisionIdx + zlog.Debug(). + Str("policy_id", agent.PolicyID). + Str("checkin_policy_id", policyID). + Int64("revision_idx", agent.PolicyRevisionIdx). + Int64("checkin_revision_idx", revIdx). + Msg("checkin included policy_id and revision_idx") + checkinOpts = append(checkinOpts, checkin.WithAgentPolicyID(policyID), checkin.WithPolicyRevisionIDX(revIdx)) + } + + // determine the effective policy ID and revision idx to use for the policy monitor. + // this can be different based on the situations to force a POLICY_CHANGE. + effectivePolicyID := policyID + effectiveRevIdx := revIdx + if len(agent.Outputs) == 0 { + // no outputs yet, force policy regeneration + effectiveRevIdx = 0 + } else { + for _, output := range agent.Outputs { + if output.APIKey == "" { + // use revision_idx=0 if the agent has a single output where no API key is defined + // This will force the policy monitor to emit a new policy to regenerate API keys + effectiveRevIdx = 0 + break + } } } - // Policy reassign, subscribe to policy with revision 0 - if policyID != agent.PolicyID { - zlog.Debug().Str(dl.FieldAgentPolicyID, policyID).Str("new_policy_id", agent.PolicyID).Msg("Policy ID mismatch detected, reassigning agent.") - return 0, opts, nil - } - - // Check if the checkin revision_idx is greater than the latest available - latestRev := ct.pm.LatestRev(ctx, agent.PolicyID) - if latestRev != 0 && revisionIDX > latestRev { - revisionIDX = 0 // set return val to 0 so the agent gets latest available revision. + if effectivePolicyID != agent.PolicyID { + // different policyID so the revisionIdx gets cleared + effectiveRevIdx = 0 } - - // Update API keys if the policy has changed, or if the revision differs. - if policyID != agent.AgentPolicyID || revisionIDX != agent.PolicyRevisionIdx { - for outputName, output := range agent.Outputs { - if output.Type != policy.OutputTypeElasticsearch { - continue - } - if err := updateAPIKey(ctx, zlog, ct.bulker, agent.Id, output.APIKeyID, output.PermissionsHash, output.ToRetireAPIKeyIds, outputName); err != nil { - // Only returns ErrUpdatingInactiveAgent - return 0, nil, err + if effectivePolicyID != agent.PolicyID || effectiveRevIdx != agent.PolicyRevisionIdx { + // the agent is either on the wrong policy or the wrong revision of that policy + zlog.Info(). + Str("policy_id", agent.PolicyID). + Str("effective_policy_id", effectivePolicyID). + Int64("revision_idx", agent.PolicyRevisionIdx). + Int64("effective_revision_idx", effectiveRevIdx). + Msg("effective policy is different; resulting in a POLICY_CHANGE") + + // skipped when ignore checkin is enabled, because this never used to occur until this started + // using the policy information in the checkin body + if !ct.cfg.Features.IgnoreCheckinPolicyID { + // this invalidates any of the API keys + for outputName, output := range agent.Outputs { + if output.Type != policy.OutputTypeElasticsearch { + continue + } + err := updateAPIKey(ctx, zlog, ct.bulker, agent.Id, output.APIKeyID, output.PermissionsHash, output.ToRetireAPIKeyIds, outputName) + if err != nil { + // Only returns ErrUpdatingInactiveAgent + return nil, "", 0, fmt.Errorf("failed to update policy details: %w", err) + } } } } - return revisionIDX, opts, nil + + return checkinOpts, effectivePolicyID, effectiveRevIdx, nil } diff --git a/internal/pkg/api/handleCheckin_test.go b/internal/pkg/api/handleCheckin_test.go index a02bd9c0e7..758c9c6fac 100644 --- a/internal/pkg/api/handleCheckin_test.go +++ b/internal/pkg/api/handleCheckin_test.go @@ -18,7 +18,6 @@ import ( "testing" "time" - "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/cache" "github.com/elastic/fleet-server/v7/internal/pkg/checkin" "github.com/elastic/fleet-server/v7/internal/pkg/config" @@ -376,17 +375,6 @@ func TestResolveSeqNo(t *testing.T) { func TestProcessUpgradeDetails(t *testing.T) { esd := model.ESDocument{Id: "doc-ID"} - doc := bulk.UpdateFields{ - dl.FieldUpgradeDetails: &UpgradeDetails{ - ActionId: "test-action", - State: UpgradeDetailsStateUPGWATCHING, - }, - dl.FieldUpgradeAttempts: nil, - } - body, err := doc.Marshal() - if err != nil { - t.Fatalf("marshal error: %v", err) - } tests := []struct { name string agent *model.Agent @@ -399,7 +387,9 @@ func TestProcessUpgradeDetails(t *testing.T) { agent: &model.Agent{ESDocument: esd}, details: nil, bulk: func() *ftesting.MockBulk { - return ftesting.NewMockBulk() + mBulk := ftesting.NewMockBulk() + noUpgradeDetailsMockCheck(t, mBulk) + return mBulk }, cache: func() *testcache.MockCache { return testcache.NewMockCache() @@ -419,7 +409,7 @@ func TestProcessUpgradeDetails(t *testing.T) { t.Logf("bulk match unmarshal error: %v", err) return false } - return doc.Doc[dl.FieldUpgradeDetails] == nil && doc.Doc[dl.FieldUpgradeStartedAt] == nil && doc.Doc[dl.FieldUpgradeStatus] == nil && doc.Doc[dl.FieldUpgradedAt] != "" + return doc.Doc[dl.FieldUpgradeDetails] == nil && doc.Doc[dl.FieldUpgradeStartedAt] == nil && doc.Doc[dl.FieldUpgradeStatus] == nil && getFieldStringValue(doc.Doc, dl.FieldUpgradedAt) != "" }), mock.Anything, mock.Anything).Return(nil) return mBulk }, @@ -473,6 +463,7 @@ func TestProcessUpgradeDetails(t *testing.T) { bulk: func() *ftesting.MockBulk { mBulk := ftesting.NewMockBulk() mBulk.On("Search", mock.Anything, dl.FleetActions, mock.Anything, mock.Anything).Return(&es.ResultT{}, es.ErrNotFound) + noUpgradeDetailsMockCheck(t, mBulk) return mBulk }, cache: func() *testcache.MockCache { @@ -509,7 +500,9 @@ func TestProcessUpgradeDetails(t *testing.T) { Metadata: &UpgradeDetails_Metadata{json.RawMessage(`{"scheduled_at":"2023:01:02T12:00:00Z"}`)}, }, bulk: func() *ftesting.MockBulk { - return ftesting.NewMockBulk() + mBulk := ftesting.NewMockBulk() + noUpgradeDetailsMockCheck(t, mBulk) + return mBulk }, cache: func() *testcache.MockCache { mCache := testcache.NewMockCache() @@ -526,7 +519,9 @@ func TestProcessUpgradeDetails(t *testing.T) { Metadata: &UpgradeDetails_Metadata{json.RawMessage(`{"scheduled_at":""}`)}, }, bulk: func() *ftesting.MockBulk { - return ftesting.NewMockBulk() + mBulk := ftesting.NewMockBulk() + noUpgradeDetailsMockCheck(t, mBulk) + return mBulk }, cache: func() *testcache.MockCache { mCache := testcache.NewMockCache() @@ -542,7 +537,9 @@ func TestProcessUpgradeDetails(t *testing.T) { State: UpgradeDetailsStateUPGSCHEDULED, }, bulk: func() *ftesting.MockBulk { - return ftesting.NewMockBulk() + mBulk := ftesting.NewMockBulk() + noUpgradeDetailsMockCheck(t, mBulk) + return mBulk }, cache: func() *testcache.MockCache { mCache := testcache.NewMockCache() @@ -560,7 +557,7 @@ func TestProcessUpgradeDetails(t *testing.T) { }, bulk: func() *ftesting.MockBulk { mBulk := ftesting.NewMockBulk() - mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.Anything, mock.Anything, mock.Anything).Return(nil) + upgradeDetailsMockCheck(t, mBulk) return mBulk }, cache: func() *testcache.MockCache { @@ -579,7 +576,7 @@ func TestProcessUpgradeDetails(t *testing.T) { }, bulk: func() *ftesting.MockBulk { mBulk := ftesting.NewMockBulk() - mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.Anything, mock.Anything, mock.Anything).Return(nil) + upgradeDetailsMockCheck(t, mBulk) return mBulk }, cache: func() *testcache.MockCache { @@ -671,7 +668,7 @@ func TestProcessUpgradeDetails(t *testing.T) { }, bulk: func() *ftesting.MockBulk { mBulk := ftesting.NewMockBulk() - mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.Anything, mock.Anything, mock.Anything).Return(nil) + upgradeDetailsMockCheck(t, mBulk) return mBulk }, cache: func() *testcache.MockCache { @@ -690,7 +687,7 @@ func TestProcessUpgradeDetails(t *testing.T) { }, bulk: func() *ftesting.MockBulk { mBulk := ftesting.NewMockBulk() - mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.Anything, mock.Anything, mock.Anything).Return(nil) + upgradeDetailsMockCheck(t, mBulk) return mBulk }, cache: func() *testcache.MockCache { @@ -709,7 +706,7 @@ func TestProcessUpgradeDetails(t *testing.T) { }, bulk: func() *ftesting.MockBulk { mBulk := ftesting.NewMockBulk() - mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.Anything, mock.Anything, mock.Anything).Return(nil) + upgradeDetailsMockCheck(t, mBulk) return mBulk }, cache: func() *testcache.MockCache { @@ -727,7 +724,9 @@ func TestProcessUpgradeDetails(t *testing.T) { Metadata: &UpgradeDetails_Metadata{json.RawMessage(`{"error_msg":""}`)}, }, bulk: func() *ftesting.MockBulk { - return ftesting.NewMockBulk() + mBulk := ftesting.NewMockBulk() + noUpgradeDetailsMockCheck(t, mBulk) + return mBulk }, cache: func() *testcache.MockCache { mCache := testcache.NewMockCache() @@ -744,7 +743,19 @@ func TestProcessUpgradeDetails(t *testing.T) { }, bulk: func() *ftesting.MockBulk { mBulk := ftesting.NewMockBulk() - mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", body, mock.Anything, mock.Anything).Return(nil) + mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.MatchedBy(func(p []byte) bool { + doc := struct { + Doc map[string]interface{} `json:"doc"` + }{} + if err := json.Unmarshal(p, &doc); err != nil { + t.Logf("bulk match unmarshal error: %v", err) + return false + } + _, upgradeDetails := doc.Doc[dl.FieldUpgradeDetails] + upgradeAttempts, ok := doc.Doc[dl.FieldUpgradeAttempts] + _, noUpgradedAt := doc.Doc[dl.FieldUpgradedAt] + return upgradeDetails && ok && upgradeAttempts == nil && !noUpgradedAt + }), mock.Anything, mock.Anything).Return(nil) return mBulk }, cache: func() *testcache.MockCache { @@ -760,23 +771,65 @@ func TestProcessUpgradeDetails(t *testing.T) { mBulk := tc.bulk() mCache := tc.cache() + bc := checkin.NewBulk(mBulk) ct := &CheckinT{ cache: mCache, + bc: bc, bulker: mBulk, } - err := ct.processUpgradeDetails(context.Background(), tc.agent, tc.details) + var err error + opts := make([]checkin.Option, 0, 3) + opts, err = ct.processUpgradeDetails(context.Background(), tc.agent, tc.details, opts) if tc.err == nil { - assert.NoError(t, err) + require.NoError(t, err) } else { - assert.ErrorIs(t, err, tc.err) + require.ErrorIs(t, err, tc.err) + } + + err = bc.CheckIn(context.Background(), tc.agent.Id, opts...) + if err != nil { + require.NoError(t, err) } + mBulk.AssertExpectations(t) mCache.AssertExpectations(t) }) } } +func noUpgradeDetailsMockCheck(t *testing.T, mBulk *ftesting.MockBulk) { + mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.MatchedBy(func(p []byte) bool { + doc := struct { + Doc map[string]interface{} `json:"doc"` + }{} + if err := json.Unmarshal(p, &doc); err != nil { + t.Logf("bulk match unmarshal error: %v", err) + return false + } + _, noUpgradeDetails := doc.Doc[dl.FieldUpgradeDetails] + _, noUpgradeStartedAt := doc.Doc[dl.FieldUpgradeStartedAt] + _, noUpgradeStatus := doc.Doc[dl.FieldUpgradeStatus] + _, noUpgradedAt := doc.Doc[dl.FieldUpgradedAt] + return !noUpgradeDetails && !noUpgradeStartedAt && !noUpgradeStatus && !noUpgradedAt + }), mock.Anything, mock.Anything).Return(nil) +} + +func upgradeDetailsMockCheck(t *testing.T, mBulk *ftesting.MockBulk) { + mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.MatchedBy(func(p []byte) bool { + doc := struct { + Doc map[string]interface{} `json:"doc"` + }{} + if err := json.Unmarshal(p, &doc); err != nil { + t.Logf("bulk match unmarshal error: %v", err) + return false + } + _, upgradeDetails := doc.Doc[dl.FieldUpgradeDetails] + _, noUpgradedAt := doc.Doc[dl.FieldUpgradedAt] + return upgradeDetails && !noUpgradedAt + }), mock.Anything, mock.Anything).Return(nil) +} + func Test_CheckinT_writeResponse(t *testing.T) { tests := []struct { name string @@ -1084,11 +1137,12 @@ func TestValidateCheckinRequest(t *testing.T) { verCon := mustBuildConstraints("8.0.0") tests := []struct { - name string - req *http.Request - cfg *config.Server - expErr error - expValid validatedCheckin + name string + req *http.Request + cfg *config.Server + currentMeta json.RawMessage + expErr error + expValid validatedCheckin }{ { name: "Invalid JSON", @@ -1152,6 +1206,121 @@ func TestValidateCheckinRequest(t *testing.T) { rawMeta: []byte(`{"elastic": {"agent": {"id": "testid", "fips": true}}}`), }, }, + { + name: "local metadata matches", + req: &http.Request{ + Body: io.NopCloser(strings.NewReader(`{"status": "online", "message": "test message", "local_metadata": {"elastic": {"agent": {"id": "testid", "fips": true}}}}`)), + }, + expErr: nil, + currentMeta: json.RawMessage(`{"elastic": {"agent": {"id": "testid", "fips": true}}}`), + cfg: &config.Server{ + Limits: config.ServerLimits{ + CheckinLimit: config.Limit{ + MaxBody: 0, + }, + }, + }, + expValid: validatedCheckin{ + rawMeta: nil, // no need to update + }, + }, + { + name: "local metadata different JSON formatting", + req: &http.Request{ + // JSON with specific key ordering + Body: io.NopCloser(strings.NewReader(`{"status": "online", "message": "test message", "local_metadata": {"elastic": {"agent": {"id": "testid", "version": "8.0.0"}}, "host": {"hostname": "test-host"}}}`)), + }, + expErr: nil, + // Same content but different key ordering in JSON - when unmarshaled and compared + // with reflect.DeepEqual they should be equal, but raw bytes are different + currentMeta: json.RawMessage(`{"host":{"hostname":"test-host"},"elastic":{"agent":{"version":"8.0.0","id":"testid"}}}`), + cfg: &config.Server{ + Limits: config.ServerLimits{ + CheckinLimit: config.Limit{ + MaxBody: 0, + }, + }, + }, + expValid: validatedCheckin{ + rawMeta: nil, // should recognize as same content despite different formatting + }, + }, + { + name: "local metadata is empty string and agent has nil", + req: &http.Request{ + Body: io.NopCloser(strings.NewReader(`{"status": "online", "message": "test message", "local_metadata": ""}`)), + }, + expErr: nil, + currentMeta: nil, + cfg: &config.Server{ + Limits: config.ServerLimits{ + CheckinLimit: config.Limit{ + MaxBody: 0, + }, + }, + }, + expValid: validatedCheckin{ + // don't update metadata + rawMeta: nil, + }, + }, + { + name: "local metadata is empty string and agent has different value", + req: &http.Request{ + Body: io.NopCloser(strings.NewReader(`{"status": "online", "message": "test message", "local_metadata": ""}`)), + }, + expErr: nil, + currentMeta: json.RawMessage(`{"host": {"hostname": "test-host"}}`), + cfg: &config.Server{ + Limits: config.ServerLimits{ + CheckinLimit: config.Limit{ + MaxBody: 0, + }, + }, + }, + expValid: validatedCheckin{ + // don't update metadata + rawMeta: nil, + }, + }, + { + name: "local metadata is null and agent has nil", + req: &http.Request{ + Body: io.NopCloser(strings.NewReader(`{"status": "online", "message": "test message", "local_metadata": null}`)), + }, + expErr: nil, + currentMeta: nil, + cfg: &config.Server{ + Limits: config.ServerLimits{ + CheckinLimit: config.Limit{ + MaxBody: 0, + }, + }, + }, + expValid: validatedCheckin{ + // don't update metadata + rawMeta: nil, + }, + }, + { + name: "local metadata is null and agent has existing metadata", + req: &http.Request{ + Body: io.NopCloser(strings.NewReader(`{"status": "online", "message": "test message", "local_metadata": null}`)), + }, + expErr: nil, + currentMeta: json.RawMessage(`{"host": {"hostname": "test-host"}}`), + cfg: &config.Server{ + Limits: config.ServerLimits{ + CheckinLimit: config.Limit{ + MaxBody: 0, + }, + }, + }, + expValid: validatedCheckin{ + // don't update metadata + rawMeta: nil, + }, + }, } for _, tc := range tests { @@ -1160,7 +1329,7 @@ func TestValidateCheckinRequest(t *testing.T) { assert.NoError(t, err) wr := httptest.NewRecorder() logger := testlog.SetLogger(t) - valid, err := checkin.validateRequest(logger, wr, tc.req, time.Time{}, &model.Agent{LocalMetadata: json.RawMessage(`{}`)}) + valid, err := checkin.validateRequest(logger, wr, tc.req, time.Time{}, &model.Agent{LocalMetadata: tc.currentMeta}) if tc.expErr == nil { assert.NoError(t, err) assert.Equal(t, tc.expValid.rawMeta, valid.rawMeta) @@ -1176,206 +1345,274 @@ func TestValidateCheckinRequest(t *testing.T) { } func TestProcessPolicyDetails(t *testing.T) { + esd := model.ESDocument{Id: "doc-ID"} policyID := "policy-id" revIDX2 := int64(2) tests := []struct { - name string - agent *model.Agent - req *CheckinRequest - getPolicyMonitor func() *mockPolicyMonitor - revIDX int64 - returnsOpts bool - err error + name string + agent *model.Agent + req *CheckinRequest + policyID string + revIdx int64 + bulk func() *ftesting.MockBulk + ignoreCheckin bool }{{ name: "request has no policy details", agent: &model.Agent{ + ESDocument: esd, + PolicyID: policyID, PolicyRevisionIdx: 1, + Outputs: map[string]*model.PolicyOutput{ + "default": &model.PolicyOutput{ + APIKey: "123", + }, + }, }, - req: &CheckinRequest{}, - getPolicyMonitor: func() *mockPolicyMonitor { - return &mockPolicyMonitor{} + req: &CheckinRequest{}, + policyID: policyID, + revIdx: 1, + bulk: func() *ftesting.MockBulk { + mBulk := ftesting.NewMockBulk() + noPolicyDetailsMockCheck(t, mBulk) + return mBulk }, - revIDX: 1, - returnsOpts: false, - err: nil, }, { name: "policy reassign detected", agent: &model.Agent{ - Agent: &model.AgentMetadata{ - ID: "agent-id", - }, + ESDocument: esd, PolicyID: "new-policy-id", AgentPolicyID: policyID, PolicyRevisionIdx: 2, + Outputs: map[string]*model.PolicyOutput{ + "default": &model.PolicyOutput{ + APIKey: "123", + }, + }, }, req: &CheckinRequest{ AgentPolicyId: &policyID, PolicyRevisionIdx: &revIDX2, }, - getPolicyMonitor: func() *mockPolicyMonitor { - return &mockPolicyMonitor{} + policyID: policyID, + revIdx: 0, + bulk: func() *ftesting.MockBulk { + mBulk := ftesting.NewMockBulk() + policyDetailsMockCheck(t, mBulk, policyID, revIDX2) + return mBulk }, - revIDX: 0, - returnsOpts: false, - err: nil, }, { - name: "revision updated", + name: "no outputs", agent: &model.Agent{ - Agent: &model.AgentMetadata{ - ID: "agent-id", - }, + ESDocument: esd, PolicyID: policyID, AgentPolicyID: policyID, - PolicyRevisionIdx: 1, - }, - req: &CheckinRequest{ - AgentPolicyId: &policyID, - PolicyRevisionIdx: &revIDX2, + PolicyRevisionIdx: 2, }, - getPolicyMonitor: func() *mockPolicyMonitor { - pm := &mockPolicyMonitor{} - pm.On("LatestRev", mock.Anything, policyID).Return(int64(2)).Once() - return pm + req: &CheckinRequest{}, + policyID: policyID, + revIdx: 0, + bulk: func() *ftesting.MockBulk { + mBulk := ftesting.NewMockBulk() + noPolicyDetailsMockCheck(t, mBulk) + return mBulk }, - revIDX: 2, - returnsOpts: true, - err: nil, }, { - name: "checkin revision is greater than the policy's latest revision", + name: "missing output APIKey", agent: &model.Agent{ - Agent: &model.AgentMetadata{ - ID: "agent-id", + ESDocument: esd, + PolicyID: policyID, + AgentPolicyID: policyID, + PolicyRevisionIdx: 2, + Outputs: map[string]*model.PolicyOutput{ + "default": &model.PolicyOutput{ + APIKey: "123", + }, + "remote": &model.PolicyOutput{}, }, + }, + req: &CheckinRequest{}, + policyID: policyID, + revIdx: 0, + bulk: func() *ftesting.MockBulk { + mBulk := ftesting.NewMockBulk() + noPolicyDetailsMockCheck(t, mBulk) + return mBulk + }, + }, { + name: "revision updated", + agent: &model.Agent{ + ESDocument: esd, PolicyID: policyID, AgentPolicyID: policyID, PolicyRevisionIdx: 1, + Outputs: map[string]*model.PolicyOutput{ + "default": &model.PolicyOutput{ + APIKey: "123", + }, + }, }, req: &CheckinRequest{ AgentPolicyId: &policyID, PolicyRevisionIdx: &revIDX2, }, - getPolicyMonitor: func() *mockPolicyMonitor { - pm := &mockPolicyMonitor{} - pm.On("LatestRev", mock.Anything, policyID).Return(int64(1)).Once() - return pm + policyID: policyID, + revIdx: revIDX2, + bulk: func() *ftesting.MockBulk { + mBulk := ftesting.NewMockBulk() + policyDetailsMockCheck(t, mBulk, policyID, revIDX2) + return mBulk }, - revIDX: 0, - returnsOpts: true, - err: nil, }, { - name: "agent_policy_id has changed", + name: "agent does not have agent_policy_id present", agent: &model.Agent{ - Agent: &model.AgentMetadata{ - ID: "agent-id", - }, + ESDocument: esd, PolicyID: policyID, - AgentPolicyID: "old-policy-id", - PolicyRevisionIdx: 1, + PolicyRevisionIdx: 2, + Outputs: map[string]*model.PolicyOutput{ + "default": &model.PolicyOutput{ + APIKey: "123", + }, + }, }, req: &CheckinRequest{ AgentPolicyId: &policyID, PolicyRevisionIdx: &revIDX2, }, - getPolicyMonitor: func() *mockPolicyMonitor { - pm := &mockPolicyMonitor{} - pm.On("LatestRev", mock.Anything, policyID).Return(int64(2)).Once() - return pm + policyID: policyID, + revIdx: revIDX2, + bulk: func() *ftesting.MockBulk { + mBulk := ftesting.NewMockBulk() + policyDetailsMockCheck(t, mBulk, policyID, revIDX2) + return mBulk }, - revIDX: 2, - returnsOpts: true, - err: nil, }, { - name: "agent does not have agent_policy_id present", + name: "details present with no changes for agent doc", agent: &model.Agent{ - Agent: &model.AgentMetadata{ - ID: "agent-id", - }, + ESDocument: esd, + AgentPolicyID: policyID, PolicyID: policyID, - PolicyRevisionIdx: 2, + PolicyRevisionIdx: revIDX2, + Outputs: map[string]*model.PolicyOutput{ + "default": &model.PolicyOutput{ + APIKey: "123", + }, + }, }, req: &CheckinRequest{ AgentPolicyId: &policyID, PolicyRevisionIdx: &revIDX2, }, - getPolicyMonitor: func() *mockPolicyMonitor { - pm := &mockPolicyMonitor{} - pm.On("LatestRev", mock.Anything, policyID).Return(int64(2)).Once() - return pm + policyID: policyID, + revIdx: revIDX2, + bulk: func() *ftesting.MockBulk { + mBulk := ftesting.NewMockBulk() + policyDetailsMockCheck(t, mBulk, policyID, revIDX2) + return mBulk }, - revIDX: 2, - returnsOpts: true, - err: nil, }, { - name: "details present with no changes from agent doc", + name: "details present ignore checkin", agent: &model.Agent{ - Agent: &model.AgentMetadata{ - ID: "agent-id", - }, + ESDocument: esd, AgentPolicyID: policyID, PolicyID: policyID, PolicyRevisionIdx: revIDX2, + Outputs: map[string]*model.PolicyOutput{ + "default": &model.PolicyOutput{ + APIKey: "123", + }, + }, }, req: &CheckinRequest{ AgentPolicyId: &policyID, PolicyRevisionIdx: &revIDX2, }, - getPolicyMonitor: func() *mockPolicyMonitor { - pm := &mockPolicyMonitor{} - pm.On("LatestRev", mock.Anything, policyID).Return(int64(2)).Once() - return pm + policyID: policyID, + revIdx: revIDX2, + bulk: func() *ftesting.MockBulk { + mBulk := ftesting.NewMockBulk() + noPolicyDetailsMockCheck(t, mBulk) + return mBulk }, - revIDX: 2, - returnsOpts: false, - err: nil, + ignoreCheckin: true, }} for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { logger := testlog.SetLogger(t) - pm := tc.getPolicyMonitor() - checkin := &CheckinT{ - cfg: &config.Server{}, - bulker: ftesting.NewMockBulk(), - pm: pm, - } - revIDX, opts, err := checkin.processPolicyDetails(t.Context(), logger, tc.agent, tc.req) - assert.Equal(t, tc.revIDX, revIDX) - if tc.returnsOpts { - assert.NotEmpty(t, opts) - } else { - assert.Empty(t, opts) + mBulk := tc.bulk() + bc := checkin.NewBulk(mBulk) + cfg := &config.Server{} + if tc.ignoreCheckin { + cfg.Features.IgnoreCheckinPolicyID = true } - if tc.err != nil { - assert.ErrorIs(t, tc.err, err) - } else { - assert.NoError(t, err) + ct := &CheckinT{ + bc: bc, + bulker: mBulk, + cfg: cfg, } - pm.AssertExpectations(t) + + opts := make([]checkin.Option, 0, 2) + opts, ePolicyID, eRevIdx, err := ct.processPolicyDetails(t.Context(), logger, tc.agent, tc.req, opts) + require.NoError(t, err) + assert.Equal(t, tc.policyID, ePolicyID) + assert.Equal(t, tc.revIdx, eRevIdx) + + err = bc.CheckIn(t.Context(), tc.agent.Id, opts...) + assert.NoError(t, err) + + mBulk.AssertExpectations(t) }) } +} - t.Run("IgnoreCheckinPolicyID flag is set", func(t *testing.T) { - logger := testlog.SetLogger(t) - checkin := &CheckinT{ - cfg: &config.Server{ - Features: config.FeatureFlags{ - IgnoreCheckinPolicyID: true, - }, - }, +func noPolicyDetailsMockCheck(t *testing.T, mBulk *ftesting.MockBulk) { + mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.MatchedBy(func(p []byte) bool { + doc := struct { + Doc map[string]interface{} `json:"doc"` + }{} + if err := json.Unmarshal(p, &doc); err != nil { + t.Logf("bulk match unmarshal error: %v", err) + return false } - revIDX, opts, err := checkin.processPolicyDetails(t.Context(), logger, - &model.Agent{ - PolicyID: policyID, - PolicyRevisionIdx: 1, - }, - &CheckinRequest{ - AgentPolicyId: &policyID, - PolicyRevisionIdx: &revIDX2, - }, - ) - assert.NoError(t, err) - assert.Equal(t, int64(1), revIDX) - assert.Empty(t, opts) - }) + _, noAgentPolicyID := doc.Doc[dl.FieldAgentPolicyID] + _, noPolicyRevisionIdx := doc.Doc[dl.FieldPolicyRevisionIdx] + return !noAgentPolicyID && !noPolicyRevisionIdx + }), mock.Anything, mock.Anything).Return(nil) +} + +func policyDetailsMockCheck(t *testing.T, mBulk *ftesting.MockBulk, policyID string, revIdx int64) { + mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.MatchedBy(func(p []byte) bool { + doc := struct { + Doc map[string]interface{} `json:"doc"` + }{} + if err := json.Unmarshal(p, &doc); err != nil { + t.Logf("bulk match unmarshal error: %v", err) + return false + } + oPolicyID, hasPolicy := doc.Doc[dl.FieldAgentPolicyID] + if !hasPolicy { + return false + } + oRevIdx, hasRevIdx := doc.Doc[dl.FieldPolicyRevisionIdx] + if !hasRevIdx { + return false + } + oRevIdxF, ok := oRevIdx.(float64) + if !ok { + return false + } + return oPolicyID == policyID && int64(oRevIdxF) == revIdx + }), mock.Anything, mock.Anything).Return(nil) +} + +func getFieldStringValue(doc map[string]interface{}, key string) string { + raw, ok := doc[key] + if ok { + s, ok := raw.(string) + if ok { + return s + } + } + return "" } diff --git a/internal/pkg/checkin/bulk.go b/internal/pkg/checkin/bulk.go index a2304ffd85..cfef6f3f29 100644 --- a/internal/pkg/checkin/bulk.go +++ b/internal/pkg/checkin/bulk.go @@ -9,7 +9,6 @@ import ( "context" _ "embed" "encoding/json" - "errors" "fmt" "sync" "time" @@ -42,128 +41,225 @@ func WithFlushInterval(d time.Duration) Opt { } // Option is the type for optional arguments for agent checkins. -type Option func(*pendingT) +type Option func(*checkinT) func WithStatus(status string) Option { - return func(pending *pendingT) { - pending.status = status + return func(state *checkinT) { + state.status.isSet = true + state.status.value = status } } func WithMessage(message string) Option { - return func(pending *pendingT) { - pending.message = message + return func(state *checkinT) { + state.message.isSet = true + state.message.value = message } } func WithUnhealthyReason(reason *[]string) Option { - return func(pending *pendingT) { - pending.unhealthyReason = reason + return func(state *checkinT) { + state.unhealthyReason.isSet = true + state.unhealthyReason.value = reason } } -func WithMeta(meta []byte) Option { - return func(pending *pendingT) { - if pending.extra == nil { - pending.extra = &extraT{} - } - pending.extra.meta = meta +func WithMeta(meta *json.RawMessage) Option { + return func(state *checkinT) { + state.meta.isSet = true + state.meta.value = meta } } func WithSeqNo(seqno sqn.SeqNo) Option { - return func(pending *pendingT) { + return func(state *checkinT) { if !seqno.IsSet() { return } - if pending.extra == nil { - pending.extra = &extraT{} - } - pending.extra.seqNo = seqno + state.seqNo = seqno } } func WithVer(ver string) Option { - return func(pending *pendingT) { - if pending.extra == nil { - pending.extra = &extraT{} - } - pending.extra.ver = ver + return func(state *checkinT) { + state.ver.isSet = true + state.ver.value = ver } } -func WithComponents(components []byte) Option { - return func(pending *pendingT) { - if pending.extra == nil { - pending.extra = &extraT{} - } - pending.extra.components = components +func WithComponents(components *json.RawMessage) Option { + return func(state *checkinT) { + state.components.isSet = true + state.components.value = components } } func WithDeleteAudit(del bool) Option { - return func(pending *pendingT) { + return func(state *checkinT) { if !del { return } - if pending.extra == nil { - pending.extra = &extraT{} - } - pending.extra.deleteAudit = del + state.deleteAudit = del } } func WithAgentPolicyID(id string) Option { - return func(pending *pendingT) { - pending.agentPolicyID = id + return func(state *checkinT) { + state.agentPolicyID.isSet = true + state.agentPolicyID.value = id } } func WithPolicyRevisionIDX(idx int64) Option { - return func(pending *pendingT) { - pending.revisionIDX = idx + return func(state *checkinT) { + state.revisionIDX.isSet = true + state.revisionIDX.value = idx + } +} + +func WithUpgradeDetails(details *json.RawMessage) Option { + return func(state *checkinT) { + state.upgradeDetails.isSet = true + state.upgradeDetails.value = details + } +} + +func WithUpgradeStartedAt(startedAt *string) Option { + return func(state *checkinT) { + state.upgradeStartedAt.isSet = true + state.upgradeStartedAt.value = startedAt + } +} + +func WithUpgradeStatus(status *string) Option { + return func(state *checkinT) { + state.upgradeStatus.isSet = true + state.upgradeStatus.value = status } } -type extraT struct { - meta []byte - seqNo sqn.SeqNo - ver string - components []byte +func WithUpgradeAttempts(attempts *[]string) Option { + return func(state *checkinT) { + state.upgradeAttempts.isSet = true + state.upgradeAttempts.value = attempts + } +} + +func WithUpgradedAt(upgradedAt *string) Option { + return func(state *checkinT) { + state.upgradedAt.isSet = true + state.upgradedAt.value = upgradedAt + } +} + +type setT[T any] struct { + isSet bool + value T +} + +type checkinT struct { + ts string + seqNo sqn.SeqNo + + ver setT[string] + status setT[string] + message setT[string] + unhealthyReason setT[*[]string] + + agentPolicyID setT[string] + revisionIDX setT[int64] + + meta setT[*json.RawMessage] + components setT[*json.RawMessage] + + upgradeDetails setT[*json.RawMessage] + upgradeStartedAt setT[*string] + upgradeStatus setT[*string] + upgradeAttempts setT[*[]string] + upgradedAt setT[*string] + deleteAudit bool } -// Minimize the size of this structure. -// There will be 10's of thousands of items -// in the map at any point. -type pendingT struct { - ts string - status string - message string - agentPolicyID string // may be empty - revisionIDX int64 - extra *extraT - unhealthyReason *[]string +func (c *checkinT) toBody() ([]byte, error) { + fields := bulk.UpdateFields{ + dl.FieldUpdatedAt: c.ts, + dl.FieldLastCheckin: c.ts, + } + if c.unhealthyReason.isSet { + fields[dl.FieldUnhealthyReason] = c.unhealthyReason.value + } + if c.status.isSet { + fields[dl.FieldLastCheckinStatus] = c.status.value + } + if c.message.isSet { + fields[dl.FieldLastCheckinMessage] = c.message.value + } + if c.agentPolicyID.isSet { + fields[dl.FieldAgentPolicyID] = c.agentPolicyID.value + } + if c.revisionIDX.isSet { + fields[dl.FieldPolicyRevisionIdx] = c.revisionIDX.value + } + // If the agent version is not empty it needs to be updated + // Assuming the agent can be upgraded keeping the same id, but incrementing the version + if c.ver.isSet && c.ver.value != "" { + fields[dl.FieldAgent] = map[string]interface{}{ + dl.FieldAgentVersion: c.ver, + } + } + + // Update local metadata if provided (and has a value) + if c.meta.isSet { + // Surprise: The json encoder compacts this raw JSON during + // the encode process, so there my be unexpected memory overhead: + // https://github.com/golang/go/blob/de5d7eccb99088e3ab42c0d907da6852d8f9cebe/src/encoding/json/encode.go#L503-L507 + fields[dl.FieldLocalMetadata] = c.meta.value + } + + // Update components if provided (and has a value) + if c.components.isSet { + fields[dl.FieldComponents] = c.components.value + } + + if c.upgradeDetails.isSet { + fields[dl.FieldUpgradeDetails] = c.upgradeDetails.value + } + if c.upgradeStartedAt.isSet { + fields[dl.FieldUpgradeStartedAt] = c.upgradeStartedAt.value + } + if c.upgradeStatus.isSet { + fields[dl.FieldUpgradeStatus] = c.upgradeStatus.value + } + if c.upgradeAttempts.isSet { + fields[dl.FieldUpgradeAttempts] = c.upgradeAttempts.value + } + if c.upgradedAt.isSet { + fields[dl.FieldUpgradedAt] = c.upgradedAt.value + } + + // If seqNo changed, set the field appropriately + if c.seqNo.IsSet() { + fields[dl.FieldActionSeqNo] = c.seqNo + } + return fields.Marshal() } -// Bulk will batch pending checkins and update elasticsearch at a set interval. +// Bulk handles checkins and keeps connected agents updated_at timestamp up-to-date. type Bulk struct { - opts optionsT - bulker bulk.Bulk - mut sync.Mutex - pending map[string]pendingT - - ts string - unix int64 + opts optionsT + bulker bulk.Bulk + mut sync.RWMutex + connected map[string]struct{} } func NewBulk(bulker bulk.Bulk, opts ...Opt) *Bulk { parsedOpts := parseOpts(opts...) return &Bulk{ - opts: parsedOpts, - bulker: bulker, - pending: make(map[string]pendingT), + opts: parsedOpts, + bulker: bulker, + connected: make(map[string]struct{}), } } @@ -179,35 +275,81 @@ func parseOpts(opts ...Opt) optionsT { return outOpts } -// Generate and cache timestamp on seconds change. -// Avoid thousands of formats of an identical string. -func (bc *Bulk) timestamp() string { - // WARNING: Expects mutex locked. - now := time.Now() - if now.Unix() != bc.unix { - bc.unix = now.Unix() - bc.ts = now.UTC().Format(time.RFC3339) - } - - return bc.ts +// Add adds the agent to the connected list. +func (bc *Bulk) Add(id string) { + bc.mut.Lock() + defer bc.mut.Unlock() + bc.connected[id] = struct{}{} } -// CheckIn will add the agent (identified by id) to the pending set. -// The pending agents are sent to elasticsearch as a bulk update at each flush interval. -// NOTE: If Checkin is called after Run has returned it will just add the entry to the pending map and not do any operations, this may occur when the fleet-server is shutting down. -// WARNING: Bulk will take ownership of fields, so do not use after passing in. -func (bc *Bulk) CheckIn(id string, opts ...Option) error { +// Remove removes the agent from the connected list. +func (bc *Bulk) Remove(id string) { bc.mut.Lock() - pending := pendingT{ - ts: bc.timestamp(), - } + defer bc.mut.Unlock() + delete(bc.connected, id) +} +// CheckIn records that the agent has checked-in. +// +// This does not call `Add`, the caller should also call that to record that the agent is connected so that bulk +// can keep the documents `updated_at` fields updated. +// WARNING: CheckIn will take ownership of fields, so do not use after passing in. +func (bc *Bulk) CheckIn(ctx context.Context, id string, opts ...Option) error { + now := time.Now().UTC().Format(time.RFC3339) + checkin := checkinT{ + ts: now, + } for _, opt := range opts { - opt(&pending) + opt(&checkin) + } + + // update the agent document + body, err := checkin.toBody() + if err != nil { + return fmt.Errorf("could not marshall update body: %w", err) + } + bulkOpts := []bulk.Opt{bulk.WithRetryOnConflict(3)} + if checkin.seqNo.IsSet() { + bulkOpts = append(bulkOpts, bulk.WithRefresh()) + } + + // no deleteAudit so a simple update will work + if !checkin.deleteAudit { + err = bc.bulker.Update(ctx, dl.FleetAgents, id, body, bulkOpts...) + if err != nil { + return fmt.Errorf("failed to update document: %w", err) + } + return nil } - bc.pending[id] = pending - bc.mut.Unlock() + // deleteAudit requires the usage of MUpdate to allow updating the body and then removing the needed fields. + action := &estypes.UpdateAction{ + Script: &estypes.Script{ + Lang: &scriptlanguage.Painless, + Source: &deleteAuditAttributesScript, + Options: map[string]string{}, + }, + } + actionBody, err := json.Marshal(&action) + if err != nil { + return fmt.Errorf("could not marshall script action: %w", err) + } + updates := []bulk.MultiOp{ + { + ID: id, + Index: dl.FleetAgents, + Body: body, + }, + { + ID: id, + Index: dl.FleetAgents, + Body: actionBody, + }, + } + _, err = bc.bulker.MUpdate(ctx, updates, bulkOpts...) + if err != nil { + return fmt.Errorf("failed update document and remove audit fields: %w", err) + } return nil } @@ -219,7 +361,7 @@ func (bc *Bulk) Run(ctx context.Context) error { for { select { case <-tick.C: - if err := bc.flush(ctx); err != nil { + if err := bc.flushConnected(ctx); err != nil { zerolog.Ctx(ctx).Error().Err(err).Msg("Eat bulk checkin error; Keep on truckin'") } @@ -229,199 +371,30 @@ func (bc *Bulk) Run(ctx context.Context) error { } } -// flush sends the minium data needed to update records in elasticsearch. -func (bc *Bulk) flush(ctx context.Context) error { - start := time.Now() - - bc.mut.Lock() - pending := bc.pending - bc.pending = make(map[string]pendingT, len(pending)) - bc.mut.Unlock() - - if len(pending) == 0 { - return nil +// flushConnected does an update that updates all the updated_at fields on all currently +// connected agents. +func (bc *Bulk) flushConnected(ctx context.Context) error { + nowTimestamp := time.Now().UTC().Format(time.RFC3339) + fields := bulk.UpdateFields{ + dl.FieldUpdatedAt: nowTimestamp, } - - updates := make([]bulk.MultiOp, 0, len(pending)) - - simpleCache := make(map[pendingT][]byte) - - nowTimestamp := start.UTC().Format(time.RFC3339) - - var err error - var needRefresh bool - for id, pendingData := range pending { - var body []byte - if pendingData.extra == nil { - // agents that checkin without extra attributes are cachable - // Cacheable agents can share the same status, message, and unhealthy reason. Timestamps are ignored. - // This prevents an extra JSON serialization when agents have the same update body. - var ok bool - body, ok = simpleCache[pendingData] - if !ok { - body, err = toUpdateBody(nowTimestamp, pendingData) - if err != nil { - return err - } - simpleCache[pendingData] = body - } - } else if pendingData.extra.deleteAudit { - if pendingData.extra.seqNo.IsSet() { - needRefresh = true - } - // Use a script instead of a partial doc to update if attributes need to be removed - params, err := encodeParams(nowTimestamp, pendingData) - if err != nil { - return fmt.Errorf("unable to parse checkin details as params: %w", err) - } - action := &estypes.UpdateAction{ - Script: &estypes.Script{ - Lang: &scriptlanguage.Painless, - Source: &deleteAuditAttributesScript, - Options: map[string]string{}, - Params: params, - }, - } - body, err = json.Marshal(&action) - if err != nil { - return fmt.Errorf("could not marshall script action: %w", err) - } - } else { - if pendingData.extra.seqNo.IsSet() { - needRefresh = true - } - body, err = toUpdateBody(nowTimestamp, pendingData) - if err != nil { - return err - } - } - + body, err := fields.Marshal() + if err != nil { + return fmt.Errorf("marshal updated_at field error: %w", err) + } + bc.mut.RLock() + updates := make([]bulk.MultiOp, 0, len(bc.connected)) + for id := range bc.connected { updates = append(updates, bulk.MultiOp{ ID: id, Body: body, Index: dl.FleetAgents, }) } - - var opts []bulk.Opt - if needRefresh { - opts = append(opts, bulk.WithRefresh()) + bc.mut.RUnlock() + _, err = bc.bulker.MUpdate(ctx, updates) + if err != nil { + return fmt.Errorf("mupdate error: %w", err) } - - _, err = bc.bulker.MUpdate(ctx, updates, opts...) - - zerolog.Ctx(ctx).Trace(). - Err(err). - Dur("rtt", time.Since(start)). - Int("cnt", len(updates)). - Bool("refresh", needRefresh). - Msg("Flush updates") - - return err -} - -func toUpdateBody(now string, pending pendingT) ([]byte, error) { - fields := bulk.UpdateFields{ - dl.FieldUpdatedAt: now, // Set "updated_at" to the current timestamp - dl.FieldLastCheckin: pending.ts, // Set the checkin timestamp - dl.FieldLastCheckinStatus: pending.status, // Set the pending status - dl.FieldLastCheckinMessage: pending.message, // Set the status message - dl.FieldUnhealthyReason: pending.unhealthyReason, - } - if pending.agentPolicyID != "" { - fields[dl.FieldAgentPolicyID] = pending.agentPolicyID - fields[dl.FieldPolicyRevisionIdx] = pending.revisionIDX - } - if pending.extra != nil { - // If the agent version is not empty it needs to be updated - // Assuming the agent can by upgraded keeping the same id, but incrementing the version - if pending.extra.ver != "" { - fields[dl.FieldAgent] = map[string]interface{}{ - dl.FieldAgentVersion: pending.extra.ver, - } - } - - // Update local metadata if provided - if pending.extra.meta != nil { - // Surprise: The json encoder compacts this raw JSON during - // the encode process, so there my be unexpected memory overhead: - // https://github.com/golang/go/blob/de5d7eccb99088e3ab42c0d907da6852d8f9cebe/src/encoding/json/encode.go#L503-L507 - fields[dl.FieldLocalMetadata] = json.RawMessage(pending.extra.meta) - } - - // Update components if provided - if pending.extra.components != nil { - fields[dl.FieldComponents] = json.RawMessage(pending.extra.components) - } - - // If seqNo changed, set the field appropriately - if pending.extra.seqNo.IsSet() { - fields[dl.FieldActionSeqNo] = pending.extra.seqNo - } - } - return fields.Marshal() -} - -func encodeParams(now string, data pendingT) (map[string]json.RawMessage, error) { - var ( - tsNow json.RawMessage - ts json.RawMessage - status json.RawMessage - message json.RawMessage - reason json.RawMessage - - // optional attributes below - policyID json.RawMessage - revisionIDX json.RawMessage - ver json.RawMessage - meta json.RawMessage - components json.RawMessage - isSet json.RawMessage - seqNo json.RawMessage - - err error - ) - tsNow, err = json.Marshal(now) - Err := errors.Join(err) - ts, err = json.Marshal(data.ts) - Err = errors.Join(Err, err) - status, err = json.Marshal(data.status) - Err = errors.Join(Err, err) - message, err = json.Marshal(data.message) - Err = errors.Join(Err, err) - reason, err = json.Marshal(data.unhealthyReason) - Err = errors.Join(Err, err) - policyID, err = json.Marshal(data.agentPolicyID) - Err = errors.Join(Err, err) - revisionIDX, err = json.Marshal(data.revisionIDX) - Err = errors.Join(Err, err) - ver, err = json.Marshal(data.extra.ver) - Err = errors.Join(Err, err) - isSet, err = json.Marshal(data.extra.seqNo.IsSet()) - Err = errors.Join(Err, err) - seqNo, err = json.Marshal(data.extra.seqNo) - Err = errors.Join(Err, err) - if data.extra.meta != nil { - meta = data.extra.meta - } - if data.extra.components != nil { - components = data.extra.components - } - if Err != nil { - return nil, Err - } - return map[string]json.RawMessage{ - "Now": tsNow, - "TS": ts, - "Status": status, - "Message": message, - "UnhealthyReason": reason, - "PolicyID": policyID, - "RevisionIDX": revisionIDX, - "Ver": ver, - "Meta": meta, - "Components": components, - "SeqNoSet": isSet, - "SeqNo": seqNo, - }, nil + return nil } diff --git a/internal/pkg/checkin/bulk_test.go b/internal/pkg/checkin/bulk_test.go index d0cb344d54..e6415b1c5e 100644 --- a/internal/pkg/checkin/bulk_test.go +++ b/internal/pkg/checkin/bulk_test.go @@ -8,13 +8,14 @@ import ( "context" "encoding/json" "testing" - "time" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/dl" "github.com/elastic/fleet-server/v7/internal/pkg/sqn" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" + estypes "github.com/elastic/go-elasticsearch/v8/typedapi/types" + "github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/scriptlanguage" "github.com/google/go-cmp/cmp" "github.com/rs/xid" @@ -28,21 +29,8 @@ import ( // Test with seq no // matchOp is used with mock.MatchedBy to match and validate the operation -func matchOp(tb testing.TB, c testcase, ts time.Time) func(ops []bulk.MultiOp) bool { - return func(ops []bulk.MultiOp) bool { - if len(ops) != 1 { - return false - } - if ops[0].ID != c.id { - return false - } - if ops[0].Index != dl.FleetAgents { - return false - } - tb.Log("Operation match! validating details...") - - // Decode and match operation - // NOTE putting the extra validation here seems strange, maybe we should read the args in the test body intstead? +func matchOp(tb testing.TB, c testcase) func([]byte) bool { + return func(body []byte) bool { type updateT struct { LastCheckin string `json:"last_checkin"` Status string `json:"last_checkin_status"` @@ -54,14 +42,14 @@ func matchOp(tb testing.TB, c testcase, ts time.Time) func(ops []bulk.MultiOp) b } m := make(map[string]updateT) - err := json.Unmarshal(ops[0].Body, &m) - require.NoErrorf(tb, err, "unable to validate operation body %s", string(ops[0].Body)) + err := json.Unmarshal(body, &m) + require.NoErrorf(tb, err, "unable to validate operation body %s", string(body)) sub, ok := m["doc"] require.True(tb, ok, "unable to validate operation: expected doc") - validateTimestamp(tb, ts.Truncate(time.Second), sub.LastCheckin) - validateTimestamp(tb, ts.Truncate(time.Second), sub.UpdatedAt) + assert.NotEmpty(tb, sub.LastCheckin) + assert.Equal(tb, sub.LastCheckin, sub.UpdatedAt) // should have same timestamp assert.Equal(tb, c.policyID, sub.AgentPolicyID) assert.Equal(tb, c.revisionIDX, sub.RevisionIDX) if c.seqno != nil { @@ -71,13 +59,38 @@ func matchOp(tb testing.TB, c testcase, ts time.Time) func(ops []bulk.MultiOp) b } if c.meta != nil { - assert.Equal(tb, json.RawMessage(c.meta), sub.Meta) + assert.Equal(tb, c.meta, sub.Meta) } assert.Equal(tb, c.status, sub.Status) return true } } +// matchMOp is used with mock.MatchedBy to match and validate a multi update operation +func matchMOp(tb testing.TB, c testcase) func([]bulk.MultiOp) bool { + return func(updates []bulk.MultiOp) bool { + require.Len(tb, updates, 2) + + // first is the same body as matchOp checks + assert.Equal(tb, c.id, updates[0].ID) + assert.Equal(tb, dl.FleetAgents, updates[0].Index) + assert.True(tb, matchOp(tb, c)(updates[0].Body)) + + // second is the painless script to remove the audit fields + assert.Equal(tb, c.id, updates[1].ID) + assert.Equal(tb, dl.FleetAgents, updates[1].Index) + + var action estypes.UpdateAction + err := json.Unmarshal(updates[1].Body, &action) + require.NoError(tb, err, "second update body not an update action") + require.NotNil(tb, action.Script) + assert.Equal(tb, &scriptlanguage.Painless, action.Script.Lang) + assert.Equal(tb, &deleteAuditAttributesScript, action.Script.Source) + + return true + } +} + type testcase struct { name string id string @@ -85,16 +98,15 @@ type testcase struct { message string policyID string revisionIDX int64 - meta []byte - components []byte + meta json.RawMessage + components json.RawMessage seqno sqn.SeqNo ver string unhealthyReason *[]string + deleteAudit bool } -func TestBulkSimple(t *testing.T) { - start := time.Now() - +func TestBulkCheckin(t *testing.T) { const ver = "8.9.0" cases := []testcase{{ name: "Simple case", @@ -129,6 +141,15 @@ func TestBulkSimple(t *testing.T) { meta: []byte(`{"hey":"now","brown":"cow"}`), components: []byte(`[{"id":"winlog-default","type":"winlog"}]`), ver: ver, + }, { + name: "Multi field case w/ delete audit", + id: "multiFieldId", + status: "online", + message: "message", + meta: []byte(`{"hey":"now","brown":"cow"}`), + components: []byte(`[{"id":"winlog-default","type":"winlog"}]`), + ver: ver, + deleteAudit: true, }, { name: "Multi field nested case", id: "multiFieldNestedId", @@ -168,18 +189,22 @@ func TestBulkSimple(t *testing.T) { t.Run(c.name, func(t *testing.T) { ctx := testlog.SetLogger(t).WithContext(t.Context()) mockBulk := ftesting.NewMockBulk() - mockBulk.On("MUpdate", mock.Anything, mock.MatchedBy(matchOp(t, c, start)), mock.Anything).Return([]bulk.BulkIndexerResponseItem{}, nil).Once() + if c.deleteAudit { + mockBulk.On("MUpdate", mock.Anything, mock.MatchedBy(matchMOp(t, c)), mock.Anything).Return([]bulk.BulkIndexerResponseItem{}, nil).Once() + } else { + mockBulk.On("Update", mock.Anything, dl.FleetAgents, c.id, mock.MatchedBy(matchOp(t, c)), mock.Anything).Return(nil).Once() + } bc := NewBulk(mockBulk) opts := []Option{WithStatus(c.status), WithMessage(c.message)} if c.policyID != "" { opts = append(opts, WithAgentPolicyID(c.policyID), WithPolicyRevisionIDX(c.revisionIDX)) } - if c.meta != nil { - opts = append(opts, WithMeta(c.meta)) + if len(c.meta) > 0 { + opts = append(opts, WithMeta(&c.meta)) } - if c.components != nil { - opts = append(opts, WithComponents(c.components)) + if len(c.components) > 0 { + opts = append(opts, WithComponents(&c.components)) } if c.seqno != nil { opts = append(opts, WithSeqNo(c.seqno)) @@ -190,10 +215,11 @@ func TestBulkSimple(t *testing.T) { if c.unhealthyReason != nil { opts = append(opts, WithUnhealthyReason(c.unhealthyReason)) } + if c.deleteAudit { + opts = append(opts, WithDeleteAudit(true)) + } - err := bc.CheckIn(c.id, opts...) - require.NoError(t, err) - err = bc.flush(ctx) + err := bc.CheckIn(ctx, c.id, opts...) require.NoError(t, err) mockBulk.AssertExpectations(t) @@ -201,12 +227,6 @@ func TestBulkSimple(t *testing.T) { } } -func validateTimestamp(tb testing.TB, start time.Time, ts string) { - t1, err := time.Parse(time.RFC3339, ts) - require.NoErrorf(tb, err, "expected %q to be in RFC 3339 format", ts) - require.False(tb, start.After(t1), "timestamp in the past") -} - func benchmarkBulk(n int, b *testing.B) { mockBulk := ftesting.NewMockBulk() bc := NewBulk(mockBulk) @@ -221,10 +241,7 @@ func benchmarkBulk(n int, b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { for _, id := range ids { - err := bc.CheckIn(id) - if err != nil { - b.Fatal(err) - } + bc.Add(id) } } } @@ -247,14 +264,11 @@ func benchmarkFlush(n int, b *testing.B) { for i := 0; i < b.N; i++ { b.StopTimer() for _, id := range ids { - err := bc.CheckIn(id) // TODO ths benchmark is not very interesting as the simplecache is used - if err != nil { - b.Fatal(err) - } + bc.Add(id) } b.StartTimer() - err := bc.flush(ctx) + err := bc.flushConnected(ctx) if err != nil { b.Fatal(err) } diff --git a/internal/pkg/checkin/deleteAuditFieldsOnCheckin.painless b/internal/pkg/checkin/deleteAuditFieldsOnCheckin.painless index 3b0f111865..8774fee674 100644 --- a/internal/pkg/checkin/deleteAuditFieldsOnCheckin.painless +++ b/internal/pkg/checkin/deleteAuditFieldsOnCheckin.painless @@ -1,24 +1,3 @@ -ctx._source.last_checkin = params.TS; -ctx._source.updated_at = params.Now; -ctx._source.last_checkin_status = params.Status; -ctx._source.last_checkin_message = params.Message; -ctx._source.unhealthy_reason = params.UnhealthyReason; -if (params.PolicyID != "") { - ctx._source.agent_policy_id = params.PolicyID; - ctx._source.policy_revision_idx = params.RevisionIDX; -} -if (params.Ver != "") { - ctx._source.agent.version = params.Ver; -} -if (params.Meta != null) { - ctx._source.local_metadata = params.Meta; -} -if (params.Components != null) { - ctx._source.components = params.Components; -} -if (params.SeqNoSet) { - ctx._source.action_seq_no = params.SeqNo; -} ctx._source.remove('audit_unenrolled_reason'); ctx._source.remove('audit_unenrolled_time'); ctx._source.remove('unenrolled_at'); // audit/unenroll no longer sets this but it did in the 8.16.x releases, so we need to clear it. diff --git a/internal/pkg/policy/sub.go b/internal/pkg/policy/sub.go index e767c35f29..31f8830bc8 100644 --- a/internal/pkg/policy/sub.go +++ b/internal/pkg/policy/sub.go @@ -77,7 +77,7 @@ func (n *subT) isEmpty() bool { func (n *subT) isUpdate(policy *model.Policy) bool { pRevIdx := policy.RevisionIdx - return pRevIdx > n.revIdx + return pRevIdx != n.revIdx } // Output returns a new policy that needs to be sent based on the current subscription. diff --git a/internal/pkg/server/fleet.go b/internal/pkg/server/fleet.go index ef8aeb29b8..6d5b155f55 100644 --- a/internal/pkg/server/fleet.go +++ b/internal/pkg/server/fleet.go @@ -521,7 +521,7 @@ func (f *Fleet) runSubsystems(ctx context.Context, cfg *config.Config, g *errgro ad := action.NewDispatcher(am, cfg.Inputs[0].Server.Limits.ActionLimit.Interval, cfg.Inputs[0].Server.Limits.ActionLimit.Burst, bulker) g.Go(loggedRunFunc(ctx, "Action dispatcher", ad.Run)) - bc := checkin.NewBulk(bulker) + bc := checkin.NewBulk(bulker, checkin.WithFlushInterval(cfg.Inputs[0].Server.Timeouts.CheckinTimestamp)) g.Go(loggedRunFunc(ctx, "Bulk checkin", bc.Run)) ct, err := api.NewCheckinT(f.verCon, &cfg.Inputs[0].Server, f.cache, bc, pm, am, ad, bulker) diff --git a/internal/pkg/server/fleet_integration_test.go b/internal/pkg/server/fleet_integration_test.go index 7bac90de52..36f565bbcc 100644 --- a/internal/pkg/server/fleet_integration_test.go +++ b/internal/pkg/server/fleet_integration_test.go @@ -1685,7 +1685,7 @@ func Test_SmokeTest_AuditUnenroll(t *testing.T) { _, ok = obj["audit_unenrolled_reason"] _, ok2 := obj["unenrolled_at"] return !ok && !ok2 - }, time.Second*20, time.Second, "agent document should not have the audit_unenrolled_reason or unenrolled_at attributes. agent doc: %v", obj) + }, time.Second*10, time.Second, "agent document should not have the audit_unenrolled_reason or unenrolled_at attributes. agent doc: %v", obj) cancel() srv.waitExit() //nolint:errcheck // test case } diff --git a/testing/e2e/api_version/client_api_current.go b/testing/e2e/api_version/client_api_current.go index 7ed80cccdf..459cc075d1 100644 --- a/testing/e2e/api_version/client_api_current.go +++ b/testing/e2e/api_version/client_api_current.go @@ -465,7 +465,7 @@ func (tester *ClientAPITester) TestEnrollAuditUnenroll() { tester.Require().NoError(err) _, ok := obj.Source["audit_unenrolled_reason"] return !ok - }, time.Second*20, time.Second, "agent document in elasticsearch should not have audit_unenrolled_reason attribute") + }, time.Second*10, time.Second, "agent document in elasticsearch should not have audit_unenrolled_reason attribute") } // TestEnrollUpgradeAction_MetadataDownloadRate_String checks that download metadata rates can be sent as strings @@ -675,15 +675,21 @@ func (tester *ClientAPITester) TestCheckinWithPolicyIDRevision() { } } tester.Require().True(found, "unable to find POLICY_CHANGE action in 4th checkin response") - revIDX = int64(policyChange.Policy.Revision) - tester.Require().Equal(int64(iRev), revIDX, "Expected POLICY_CHANGE action to be for updated policy revision") + tester.Require().Equal(int64(iRev), int64(policyChange.Policy.Revision), "Expected POLICY_CHANGE action to be for updated policy revision") + // Document should reflect the current state of the Elastic Agent, which is always the revision_idx it used + // to check-in. Only an ACK or the next check-in that states the next revision would it be updated to the + // revision_idx that comes from the policy. tester.Require().EventuallyWithT(func(c *assert.CollectT) { agent := tester.GetAgent(ctx, agentID) require.Equal(c, policyID, agent.AgentPolicyID) require.Equal(c, revIDX, int64(agent.Revision)) }, time.Second*10, time.Second) + // Bump the revision_idx to the same as the policy to signal to Fleet that the Elastic Agent is now + // running the latest revision that it was sent. + revIDX = int64(policyChange.Policy.Revision) + // Do a normal checkin to "reset" to latest revision_idx // no actions are returned // Manage any API keys if present