diff --git a/changelog/fragments/1762527662-fix-data-race-issue-during-checkin-when-multiple-fleet-servers-are-used.yaml b/changelog/fragments/1762527662-fix-data-race-issue-during-checkin-when-multiple-fleet-servers-are-used.yaml deleted file mode 100644 index 827fd39e6e..0000000000 --- a/changelog/fragments/1762527662-fix-data-race-issue-during-checkin-when-multiple-fleet-servers-are-used.yaml +++ /dev/null @@ -1,45 +0,0 @@ -# REQUIRED -# Kind can be one of: -# - breaking-change: a change to previously-documented behavior -# - deprecation: functionality that is being removed in a later release -# - bug-fix: fixes a problem in a previous version -# - enhancement: extends functionality but does not break or fix existing behavior -# - feature: new functionality -# - known-issue: problems that we are aware of in a given version -# - security: impacts on the security of a product or a user’s deployment. -# - upgrade: important information for someone upgrading from a prior version -# - other: does not fit into any of the other categories -kind: bug-fix - -# REQUIRED for all kinds -# Change summary; a 80ish characters long description of the change. -summary: fix data race issue during checkin when multiple fleet-servers are used - -# REQUIRED for breaking-change, deprecation, known-issue -# Long description; in case the summary is not enough to describe the change -# this field accommodate a description without length limits. -# description: - -# REQUIRED for breaking-change, deprecation, known-issue -# impact: - -# REQUIRED for breaking-change, deprecation, known-issue -# action: - -# REQUIRED for all kinds -# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. -component: fleet-server - -# AUTOMATED -# OPTIONAL to manually add other PR URLs -# PR URL: A link the PR that added the changeset. -# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. -# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. -# Please provide it if you are adding a fragment for a different PR. -# pr: https://github.com/owner/repo/1234 - -# AUTOMATED -# OPTIONAL to manually add other issue URLs -# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). -# If not present is automatically filled by the tooling with the issue linked to the PR number. -# issue: https://github.com/owner/repo/1234 diff --git a/internal/pkg/api/handleCheckin.go b/internal/pkg/api/handleCheckin.go index d51280a905..1aa720f8a7 100644 --- a/internal/pkg/api/handleCheckin.go +++ b/internal/pkg/api/handleCheckin.go @@ -169,8 +169,8 @@ func invalidateAPIKeysOfInactiveAgent(ctx context.Context, zlog zerolog.Logger, type validatedCheckin struct { req *CheckinRequest dur time.Duration - rawMeta json.RawMessage - rawComp json.RawMessage + rawMeta []byte + rawComp []byte seqno sqn.SeqNo unhealthyReason *[]string } @@ -275,37 +275,29 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r seqno := validated.seqno unhealthyReason := validated.unhealthyReason - checkinOpts := make([]checkin.Option, 0, 14) // maximum of 14 (pre-allocate to reduce allocs) - checkinOpts = append(checkinOpts, + // Handle upgrade details for agents using the new 8.11 upgrade details field of the checkin. + // Older agents will communicate any issues with upgrades via the Ack endpoint. + if err := ct.processUpgradeDetails(r.Context(), agent, req.UpgradeDetails); err != nil { + return fmt.Errorf("failed to update upgrade_details: %w", err) + } + + initialOpts := []checkin.Option{ checkin.WithStatus(string(req.Status)), checkin.WithMessage(req.Message), + checkin.WithMeta(rawMeta), + checkin.WithComponents(rawComponents), checkin.WithSeqNo(seqno), checkin.WithVer(ver), checkin.WithUnhealthyReason(unhealthyReason), checkin.WithDeleteAudit(agent.AuditUnenrolledReason != "" || agent.UnenrolledAt != ""), - ) - if len(rawMeta) > 0 { - checkinOpts = append(checkinOpts, checkin.WithMeta(&rawMeta)) - } - if len(rawComponents) > 0 { - checkinOpts = append(checkinOpts, checkin.WithComponents(&rawComponents)) } - // Handle upgrade details for agents using the new 8.11 upgrade details field of the checkin. - // Older agents will communicate any issues with upgrades via the Ack endpoint. - // checkinOpts is passed in to reducing allocations - checkinOpts, err = ct.processUpgradeDetails(r.Context(), agent, req.UpgradeDetails, checkinOpts) + revID, opts, err := ct.processPolicyDetails(r.Context(), zlog, agent, req) if err != nil { - return fmt.Errorf("failed to update upgrade_details: %w", err) + return fmt.Errorf("failed to update policy details: %w", err) } - - // Handle the policy ID and the revision idx upon check-in. We want to always ensure that - // the agent document represents the current state of this check-in. - var effectivePolicyID string - var effectiveRevIdx int64 - checkinOpts, effectivePolicyID, effectiveRevIdx, err = ct.processPolicyDetails(r.Context(), zlog, agent, req, checkinOpts) - if err != nil { - return fmt.Errorf("failed to process policy details: %w", err) + if len(opts) > 0 { + initialOpts = append(initialOpts, opts...) } // Subscribe to actions dispatcher @@ -313,18 +305,31 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r defer ct.ad.Unsubscribe(zlog, aSub) actCh := aSub.Ch() + for _, output := range agent.Outputs { + if output.APIKey == "" { + // use revision_idx=0 if the agent has a single output where no API key is defined + // This will force the policy monitor to emit a new policy to regerate API keys + revID = 0 + break + } + } + // Subscribe to policy manager for changes on PolicyId > policyRev - sub, err := ct.pm.Subscribe(agent.Id, effectivePolicyID, effectiveRevIdx) + sub, err := ct.pm.Subscribe(agent.Id, agent.PolicyID, revID) if err != nil { return fmt.Errorf("subscribe policy monitor: %w", err) } defer func() { err := ct.pm.Unsubscribe(sub) if err != nil { - zlog.Error().Err(err).Str(ecs.PolicyID, effectivePolicyID).Msg("unable to unsubscribe from policy") + zlog.Error().Err(err).Str(ecs.PolicyID, agent.PolicyID).Msg("unable to unsubscribe from policy") } }() + // Update check-in timestamp on timeout + tick := time.NewTicker(ct.cfg.Timeouts.CheckinTimestamp) + defer tick.Stop() + setupDuration := time.Since(start) pollDuration, jitter := calcPollDuration(zlog, pollDuration, setupDuration, ct.cfg.Timeouts.CheckinJitter) @@ -340,18 +345,14 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r longPoll := time.NewTicker(pollDuration) defer longPoll.Stop() - // Initial update on checkin updates any user fields that might have changed. - err = ct.bc.CheckIn(r.Context(), agent.Id, checkinOpts...) + // Initial update on checkin, and any user fields that might have changed + // Run a script to remove audit_unenrolled_* and unenrolled_at attributes if one is set on checkin. + // 8.16.x releases would incorrectly set unenrolled_at + err = ct.bc.CheckIn(agent.Id, initialOpts...) if err != nil { zlog.Error().Err(err).Str(ecs.AgentID, agent.Id).Msg("checkin failed") - return err } - // Register the agent with the checkin to keep the `updated_at` field updated while connected. - // The defer will ensure that on return or panic that it is removed. - ct.bc.Add(agent.Id) - defer ct.bc.Remove(agent.Id) - // Initial fetch for pending actions var ( actions []Action @@ -401,6 +402,11 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r case <-longPoll.C: zlog.Trace().Msg("fire long poll") break LOOP + case <-tick.C: + err := ct.bc.CheckIn(agent.Id, checkin.WithStatus(string(req.Status)), checkin.WithMessage(req.Message), checkin.WithComponents(rawComponents), checkin.WithVer(ver), checkin.WithUnhealthyReason(unhealthyReason)) // FIXME If we change to properly handle empty strings we could stop passing optional args here. + if err != nil { + zlog.Error().Err(err).Str(ecs.AgentID, agent.Id).Msg("checkin failed") + } } } } @@ -439,29 +445,23 @@ func (ct *CheckinT) verifyActionExists(vCtx context.Context, vSpan *apm.Span, ag // if the agent doc and checkin details are both nil the method is a nop // if the checkin upgrade_details is nil but there was a previous value in the agent doc, fleet-server treats it as a successful upgrade // otherwise the details are validated; action_id is checked and upgrade_details.metadata is validated based on upgrade_details.state and the agent doc is updated. -func (ct *CheckinT) processUpgradeDetails(ctx context.Context, agent *model.Agent, details *UpgradeDetails, checkinOpts []checkin.Option) ([]checkin.Option, error) { +func (ct *CheckinT) processUpgradeDetails(ctx context.Context, agent *model.Agent, details *UpgradeDetails) error { if details == nil { - if agent.UpgradeDetails == nil { - return checkinOpts, nil + err := ct.markUpgradeComplete(ctx, agent) + if err != nil { + return err } - upgradedAt := time.Now().UTC().Format(time.RFC3339) - checkinOpts = append(checkinOpts, - checkin.WithUpgradeDetails(nil), - checkin.WithUpgradeStartedAt(nil), - checkin.WithUpgradeStatus(nil), - checkin.WithUpgradedAt(&upgradedAt), - ) - return checkinOpts, nil + return nil } - // update docs with in progress details + vSpan, vCtx := apm.StartSpan(ctx, "Check update action", "validate") action, err := ct.verifyActionExists(ctx, vSpan, agent, details) if err != nil { - return nil, err + return err } if action == nil { - return checkinOpts, nil + return nil } // link action with APM spans @@ -495,60 +495,84 @@ func (ct *CheckinT) processUpgradeDetails(ctx context.Context, agent *model.Agen upgradeDetails, err := details.Metadata.AsUpgradeMetadataDownloading() if err != nil { vSpan.End() - return nil, fmt.Errorf("%w %s: %w", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGDOWNLOADING, err) + return fmt.Errorf("%w %s: %w", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGDOWNLOADING, err) } if err := details.Metadata.FromUpgradeMetadataDownloading(upgradeDetails); err != nil { vSpan.End() - return nil, fmt.Errorf("%w %s: unable to repack metadata: %w", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGDOWNLOADING, err) + return fmt.Errorf("%w %s: unable to repack metadata: %w", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGDOWNLOADING, err) } case UpgradeDetailsStateUPGFAILED: if details.Metadata == nil { vSpan.End() - return nil, fmt.Errorf("%w: metadata missing", ErrInvalidUpgradeMetadata) + return fmt.Errorf("%w: metadata missing", ErrInvalidUpgradeMetadata) } meta, err := details.Metadata.AsUpgradeMetadataFailed() if err != nil { vSpan.End() - return nil, fmt.Errorf("%w %s: %w", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGFAILED, err) + return fmt.Errorf("%w %s: %w", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGFAILED, err) } if meta.ErrorMsg == "" { vSpan.End() - return nil, fmt.Errorf("%w: %s metadata contains empty error_msg attribute", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGFAILED) + return fmt.Errorf("%w: %s metadata contains empty error_msg attribute", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGFAILED) } // Repack metadata in failed case as the agent may send UPG_DOWNLOADING attributes. if err = details.Metadata.FromUpgradeMetadataFailed(meta); err != nil { vSpan.End() - return nil, fmt.Errorf("%w %s: unable to repack metadata: %w", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGFAILED, err) + return fmt.Errorf("%w %s: unable to repack metadata: %w", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGFAILED, err) } case UpgradeDetailsStateUPGSCHEDULED: if details.Metadata == nil { vSpan.End() - return nil, fmt.Errorf("%w: metadata missing", ErrInvalidUpgradeMetadata) + return fmt.Errorf("%w: metadata missing", ErrInvalidUpgradeMetadata) } meta, err := details.Metadata.AsUpgradeMetadataScheduled() if err != nil { vSpan.End() - return nil, fmt.Errorf("%w %s: %w", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGSCHEDULED, err) + return fmt.Errorf("%w %s: %w", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGSCHEDULED, err) } if meta.ScheduledAt.IsZero() { vSpan.End() - return nil, fmt.Errorf("%w: %s metadata contains empty scheduled_at attribute", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGSCHEDULED) + return fmt.Errorf("%w: %s metadata contains empty scheduled_at attribute", ErrInvalidUpgradeMetadata, UpgradeDetailsStateUPGSCHEDULED) } default: } vSpan.End() - detailsBody, err := json.Marshal(details) - if err != nil { - return nil, fmt.Errorf("failed to marshal upgrade details: %w", err) + doc := bulk.UpdateFields{ + dl.FieldUpgradeDetails: details, } - detailsBodyJSON := json.RawMessage(detailsBody) - checkinOpts = append(checkinOpts, checkin.WithUpgradeDetails(&detailsBodyJSON)) if agent.UpgradeAttempts != nil && details.State == UpgradeDetailsStateUPGWATCHING { - checkinOpts = append(checkinOpts, checkin.WithUpgradeAttempts(nil)) + doc[dl.FieldUpgradeAttempts] = nil + } + + body, err := doc.Marshal() + if err != nil { + return err + } + return ct.bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)) +} + +func (ct *CheckinT) markUpgradeComplete(ctx context.Context, agent *model.Agent) error { + // nop if there are no checkin details, and the agent has no details + if agent.UpgradeDetails == nil { + return nil } - return checkinOpts, nil + span, ctx := apm.StartSpan(ctx, "Mark update complete", "update") + span.Context.SetLabel("agent_id", agent.Agent.ID) + defer span.End() + // if the checkin had no details, but agent has details treat like a successful upgrade + doc := bulk.UpdateFields{ + dl.FieldUpgradeDetails: nil, + dl.FieldUpgradeStartedAt: nil, + dl.FieldUpgradeStatus: nil, + dl.FieldUpgradedAt: time.Now().UTC().Format(time.RFC3339), + } + body, err := doc.Marshal() + if err != nil { + return err + } + return ct.bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)) } func (ct *CheckinT) writeResponse(zlog zerolog.Logger, w http.ResponseWriter, r *http.Request, agent *model.Agent, resp CheckinResponse) error { @@ -1016,12 +1040,6 @@ func parseMeta(zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest) ([] return nil, nil } - // Check for empty string - not valid metadata - if str, ok := reqLocalMeta.(string); ok && str == "" { - zlog.Warn().Msg("local metadata empty; won't update metadata") - return nil, nil - } - // Deserialize the agent's metadata copy. If it fails, it's ignored as it will just // be replaced with the correct contents from the clients checkin. var agentLocalMeta interface{} @@ -1029,21 +1047,24 @@ func parseMeta(zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest) ([] zlog.Warn().Err(err).Msg("local_metadata in document invalid; ignoring it") } - // Compare the deserialized meta structures, already the same means no update needs to occur. - if reflect.DeepEqual(reqLocalMeta, agentLocalMeta) { - return nil, nil - } + var outMeta []byte + + // Compare the deserialized meta structures and return the bytes to update if different + if !reflect.DeepEqual(reqLocalMeta, agentLocalMeta) { + + zlog.Trace(). + RawJSON("oldLocalMeta", agent.LocalMetadata). + RawJSON("newLocalMeta", req.LocalMetadata). + Msg("local metadata not equal") - zlog.Trace(). - RawJSON("oldLocalMeta", agent.LocalMetadata). - RawJSON("newLocalMeta", req.LocalMetadata). - Msg("local metadata not equal") + zlog.Info(). + RawJSON("req.LocalMeta", req.LocalMetadata). + Msg("applying new local metadata") - zlog.Info(). - RawJSON("req.LocalMeta", req.LocalMetadata). - Msg("applying new local metadata") + outMeta = req.LocalMetadata + } - return req.LocalMetadata, nil + return outMeta, nil } func parseComponents(zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest) ([]byte, *[]string, error) { @@ -1178,69 +1199,54 @@ func calcPollDuration(zlog zerolog.Logger, pollDuration, setupDuration, jitterDu return pollDuration, jitter } -func (ct *CheckinT) processPolicyDetails(ctx context.Context, zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest, checkinOpts []checkin.Option) ([]checkin.Option, string, int64, error) { - // Handle the policy ID and the revision idx upon check-in. We want to always ensure that - // the agent document represents the current state of this check-in. - policyID := agent.PolicyID - revIdx := agent.PolicyRevisionIdx - if !ct.cfg.Features.IgnoreCheckinPolicyID && req.AgentPolicyId != nil && *req.AgentPolicyId != "" && req.PolicyRevisionIdx != nil && *req.PolicyRevisionIdx >= 0 { - policyID = *req.AgentPolicyId - revIdx = *req.PolicyRevisionIdx - zlog.Debug(). - Str("policy_id", agent.PolicyID). - Str("checkin_policy_id", policyID). - Int64("revision_idx", agent.PolicyRevisionIdx). - Int64("checkin_revision_idx", revIdx). - Msg("checkin included policy_id and revision_idx") - checkinOpts = append(checkinOpts, checkin.WithAgentPolicyID(policyID), checkin.WithPolicyRevisionIDX(revIdx)) - } - - // determine the effective policy ID and revision idx to use for the policy monitor. - // this can be different based on the situations to force a POLICY_CHANGE. - effectivePolicyID := policyID - effectiveRevIdx := revIdx - if len(agent.Outputs) == 0 { - // no outputs yet, force policy regeneration - effectiveRevIdx = 0 - } else { - for _, output := range agent.Outputs { - if output.APIKey == "" { - // use revision_idx=0 if the agent has a single output where no API key is defined - // This will force the policy monitor to emit a new policy to regenerate API keys - effectiveRevIdx = 0 - break - } +// processPolicyDetails handles the agent_policy_id and revision_idx included in the checkin request. +// The API keys will be managed if the agent reports a new policy id from its last checkin, or if the revision is different than what the last checkin reported. +// It returns the revision idx that should be used when subscribing for new POLICY_CHANGE actons and optional args to use when doing the non-tick checkin. +func (ct *CheckinT) processPolicyDetails(ctx context.Context, zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest) (int64, []checkin.Option, error) { + // no details specified or attributes are ignored by config + if ct.cfg.Features.IgnoreCheckinPolicyID || req == nil || req.PolicyRevisionIdx == nil || req.AgentPolicyId == nil { + return agent.PolicyRevisionIdx, nil, nil + } + policyID := *req.AgentPolicyId + revisionIDX := *req.PolicyRevisionIdx + + span, ctx := apm.StartSpan(ctx, "Process policy details", "process") + span.Context.SetLabel("agent_id", agent.Agent.ID) + span.Context.SetLabel(dl.FieldAgentPolicyID, policyID) + span.Context.SetLabel(dl.FieldPolicyRevisionIdx, revisionIDX) + defer span.End() + + // update agent doc if policy id or revision idx does not match + var opts []checkin.Option + if policyID != agent.AgentPolicyID || revisionIDX != agent.PolicyRevisionIdx { + opts = []checkin.Option{ + checkin.WithAgentPolicyID(policyID), + checkin.WithPolicyRevisionIDX(revisionIDX), } } - if effectivePolicyID != agent.PolicyID { - // different policyID so the revisionIdx gets cleared - effectiveRevIdx = 0 + // Policy reassign, subscribe to policy with revision 0 + if policyID != agent.PolicyID { + zlog.Debug().Str(dl.FieldAgentPolicyID, policyID).Str("new_policy_id", agent.PolicyID).Msg("Policy ID mismatch detected, reassigning agent.") + return 0, opts, nil } - if effectivePolicyID != agent.PolicyID || effectiveRevIdx != agent.PolicyRevisionIdx { - // the agent is either on the wrong policy or the wrong revision of that policy - zlog.Info(). - Str("policy_id", agent.PolicyID). - Str("effective_policy_id", effectivePolicyID). - Int64("revision_idx", agent.PolicyRevisionIdx). - Int64("effective_revision_idx", effectiveRevIdx). - Msg("effective policy is different; resulting in a POLICY_CHANGE") - - // skipped when ignore checkin is enabled, because this never used to occur until this started - // using the policy information in the checkin body - if !ct.cfg.Features.IgnoreCheckinPolicyID { - // this invalidates any of the API keys - for outputName, output := range agent.Outputs { - if output.Type != policy.OutputTypeElasticsearch { - continue - } - err := updateAPIKey(ctx, zlog, ct.bulker, agent.Id, output.APIKeyID, output.PermissionsHash, output.ToRetireAPIKeyIds, outputName) - if err != nil { - // Only returns ErrUpdatingInactiveAgent - return nil, "", 0, fmt.Errorf("failed to update policy details: %w", err) - } + + // Check if the checkin revision_idx is greater than the latest available + latestRev := ct.pm.LatestRev(ctx, agent.PolicyID) + if latestRev != 0 && revisionIDX > latestRev { + revisionIDX = 0 // set return val to 0 so the agent gets latest available revision. + } + + // Update API keys if the policy has changed, or if the revision differs. + if policyID != agent.AgentPolicyID || revisionIDX != agent.PolicyRevisionIdx { + for outputName, output := range agent.Outputs { + if output.Type != policy.OutputTypeElasticsearch { + continue + } + if err := updateAPIKey(ctx, zlog, ct.bulker, agent.Id, output.APIKeyID, output.PermissionsHash, output.ToRetireAPIKeyIds, outputName); err != nil { + // Only returns ErrUpdatingInactiveAgent + return 0, nil, err } } } - - return checkinOpts, effectivePolicyID, effectiveRevIdx, nil + return revisionIDX, opts, nil } diff --git a/internal/pkg/api/handleCheckin_test.go b/internal/pkg/api/handleCheckin_test.go index 758c9c6fac..a02bd9c0e7 100644 --- a/internal/pkg/api/handleCheckin_test.go +++ b/internal/pkg/api/handleCheckin_test.go @@ -18,6 +18,7 @@ import ( "testing" "time" + "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/cache" "github.com/elastic/fleet-server/v7/internal/pkg/checkin" "github.com/elastic/fleet-server/v7/internal/pkg/config" @@ -375,6 +376,17 @@ func TestResolveSeqNo(t *testing.T) { func TestProcessUpgradeDetails(t *testing.T) { esd := model.ESDocument{Id: "doc-ID"} + doc := bulk.UpdateFields{ + dl.FieldUpgradeDetails: &UpgradeDetails{ + ActionId: "test-action", + State: UpgradeDetailsStateUPGWATCHING, + }, + dl.FieldUpgradeAttempts: nil, + } + body, err := doc.Marshal() + if err != nil { + t.Fatalf("marshal error: %v", err) + } tests := []struct { name string agent *model.Agent @@ -387,9 +399,7 @@ func TestProcessUpgradeDetails(t *testing.T) { agent: &model.Agent{ESDocument: esd}, details: nil, bulk: func() *ftesting.MockBulk { - mBulk := ftesting.NewMockBulk() - noUpgradeDetailsMockCheck(t, mBulk) - return mBulk + return ftesting.NewMockBulk() }, cache: func() *testcache.MockCache { return testcache.NewMockCache() @@ -409,7 +419,7 @@ func TestProcessUpgradeDetails(t *testing.T) { t.Logf("bulk match unmarshal error: %v", err) return false } - return doc.Doc[dl.FieldUpgradeDetails] == nil && doc.Doc[dl.FieldUpgradeStartedAt] == nil && doc.Doc[dl.FieldUpgradeStatus] == nil && getFieldStringValue(doc.Doc, dl.FieldUpgradedAt) != "" + return doc.Doc[dl.FieldUpgradeDetails] == nil && doc.Doc[dl.FieldUpgradeStartedAt] == nil && doc.Doc[dl.FieldUpgradeStatus] == nil && doc.Doc[dl.FieldUpgradedAt] != "" }), mock.Anything, mock.Anything).Return(nil) return mBulk }, @@ -463,7 +473,6 @@ func TestProcessUpgradeDetails(t *testing.T) { bulk: func() *ftesting.MockBulk { mBulk := ftesting.NewMockBulk() mBulk.On("Search", mock.Anything, dl.FleetActions, mock.Anything, mock.Anything).Return(&es.ResultT{}, es.ErrNotFound) - noUpgradeDetailsMockCheck(t, mBulk) return mBulk }, cache: func() *testcache.MockCache { @@ -500,9 +509,7 @@ func TestProcessUpgradeDetails(t *testing.T) { Metadata: &UpgradeDetails_Metadata{json.RawMessage(`{"scheduled_at":"2023:01:02T12:00:00Z"}`)}, }, bulk: func() *ftesting.MockBulk { - mBulk := ftesting.NewMockBulk() - noUpgradeDetailsMockCheck(t, mBulk) - return mBulk + return ftesting.NewMockBulk() }, cache: func() *testcache.MockCache { mCache := testcache.NewMockCache() @@ -519,9 +526,7 @@ func TestProcessUpgradeDetails(t *testing.T) { Metadata: &UpgradeDetails_Metadata{json.RawMessage(`{"scheduled_at":""}`)}, }, bulk: func() *ftesting.MockBulk { - mBulk := ftesting.NewMockBulk() - noUpgradeDetailsMockCheck(t, mBulk) - return mBulk + return ftesting.NewMockBulk() }, cache: func() *testcache.MockCache { mCache := testcache.NewMockCache() @@ -537,9 +542,7 @@ func TestProcessUpgradeDetails(t *testing.T) { State: UpgradeDetailsStateUPGSCHEDULED, }, bulk: func() *ftesting.MockBulk { - mBulk := ftesting.NewMockBulk() - noUpgradeDetailsMockCheck(t, mBulk) - return mBulk + return ftesting.NewMockBulk() }, cache: func() *testcache.MockCache { mCache := testcache.NewMockCache() @@ -557,7 +560,7 @@ func TestProcessUpgradeDetails(t *testing.T) { }, bulk: func() *ftesting.MockBulk { mBulk := ftesting.NewMockBulk() - upgradeDetailsMockCheck(t, mBulk) + mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.Anything, mock.Anything, mock.Anything).Return(nil) return mBulk }, cache: func() *testcache.MockCache { @@ -576,7 +579,7 @@ func TestProcessUpgradeDetails(t *testing.T) { }, bulk: func() *ftesting.MockBulk { mBulk := ftesting.NewMockBulk() - upgradeDetailsMockCheck(t, mBulk) + mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.Anything, mock.Anything, mock.Anything).Return(nil) return mBulk }, cache: func() *testcache.MockCache { @@ -668,7 +671,7 @@ func TestProcessUpgradeDetails(t *testing.T) { }, bulk: func() *ftesting.MockBulk { mBulk := ftesting.NewMockBulk() - upgradeDetailsMockCheck(t, mBulk) + mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.Anything, mock.Anything, mock.Anything).Return(nil) return mBulk }, cache: func() *testcache.MockCache { @@ -687,7 +690,7 @@ func TestProcessUpgradeDetails(t *testing.T) { }, bulk: func() *ftesting.MockBulk { mBulk := ftesting.NewMockBulk() - upgradeDetailsMockCheck(t, mBulk) + mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.Anything, mock.Anything, mock.Anything).Return(nil) return mBulk }, cache: func() *testcache.MockCache { @@ -706,7 +709,7 @@ func TestProcessUpgradeDetails(t *testing.T) { }, bulk: func() *ftesting.MockBulk { mBulk := ftesting.NewMockBulk() - upgradeDetailsMockCheck(t, mBulk) + mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.Anything, mock.Anything, mock.Anything).Return(nil) return mBulk }, cache: func() *testcache.MockCache { @@ -724,9 +727,7 @@ func TestProcessUpgradeDetails(t *testing.T) { Metadata: &UpgradeDetails_Metadata{json.RawMessage(`{"error_msg":""}`)}, }, bulk: func() *ftesting.MockBulk { - mBulk := ftesting.NewMockBulk() - noUpgradeDetailsMockCheck(t, mBulk) - return mBulk + return ftesting.NewMockBulk() }, cache: func() *testcache.MockCache { mCache := testcache.NewMockCache() @@ -743,19 +744,7 @@ func TestProcessUpgradeDetails(t *testing.T) { }, bulk: func() *ftesting.MockBulk { mBulk := ftesting.NewMockBulk() - mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.MatchedBy(func(p []byte) bool { - doc := struct { - Doc map[string]interface{} `json:"doc"` - }{} - if err := json.Unmarshal(p, &doc); err != nil { - t.Logf("bulk match unmarshal error: %v", err) - return false - } - _, upgradeDetails := doc.Doc[dl.FieldUpgradeDetails] - upgradeAttempts, ok := doc.Doc[dl.FieldUpgradeAttempts] - _, noUpgradedAt := doc.Doc[dl.FieldUpgradedAt] - return upgradeDetails && ok && upgradeAttempts == nil && !noUpgradedAt - }), mock.Anything, mock.Anything).Return(nil) + mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", body, mock.Anything, mock.Anything).Return(nil) return mBulk }, cache: func() *testcache.MockCache { @@ -771,65 +760,23 @@ func TestProcessUpgradeDetails(t *testing.T) { mBulk := tc.bulk() mCache := tc.cache() - bc := checkin.NewBulk(mBulk) ct := &CheckinT{ cache: mCache, - bc: bc, bulker: mBulk, } - var err error - opts := make([]checkin.Option, 0, 3) - opts, err = ct.processUpgradeDetails(context.Background(), tc.agent, tc.details, opts) + err := ct.processUpgradeDetails(context.Background(), tc.agent, tc.details) if tc.err == nil { - require.NoError(t, err) + assert.NoError(t, err) } else { - require.ErrorIs(t, err, tc.err) - } - - err = bc.CheckIn(context.Background(), tc.agent.Id, opts...) - if err != nil { - require.NoError(t, err) + assert.ErrorIs(t, err, tc.err) } - mBulk.AssertExpectations(t) mCache.AssertExpectations(t) }) } } -func noUpgradeDetailsMockCheck(t *testing.T, mBulk *ftesting.MockBulk) { - mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.MatchedBy(func(p []byte) bool { - doc := struct { - Doc map[string]interface{} `json:"doc"` - }{} - if err := json.Unmarshal(p, &doc); err != nil { - t.Logf("bulk match unmarshal error: %v", err) - return false - } - _, noUpgradeDetails := doc.Doc[dl.FieldUpgradeDetails] - _, noUpgradeStartedAt := doc.Doc[dl.FieldUpgradeStartedAt] - _, noUpgradeStatus := doc.Doc[dl.FieldUpgradeStatus] - _, noUpgradedAt := doc.Doc[dl.FieldUpgradedAt] - return !noUpgradeDetails && !noUpgradeStartedAt && !noUpgradeStatus && !noUpgradedAt - }), mock.Anything, mock.Anything).Return(nil) -} - -func upgradeDetailsMockCheck(t *testing.T, mBulk *ftesting.MockBulk) { - mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.MatchedBy(func(p []byte) bool { - doc := struct { - Doc map[string]interface{} `json:"doc"` - }{} - if err := json.Unmarshal(p, &doc); err != nil { - t.Logf("bulk match unmarshal error: %v", err) - return false - } - _, upgradeDetails := doc.Doc[dl.FieldUpgradeDetails] - _, noUpgradedAt := doc.Doc[dl.FieldUpgradedAt] - return upgradeDetails && !noUpgradedAt - }), mock.Anything, mock.Anything).Return(nil) -} - func Test_CheckinT_writeResponse(t *testing.T) { tests := []struct { name string @@ -1137,12 +1084,11 @@ func TestValidateCheckinRequest(t *testing.T) { verCon := mustBuildConstraints("8.0.0") tests := []struct { - name string - req *http.Request - cfg *config.Server - currentMeta json.RawMessage - expErr error - expValid validatedCheckin + name string + req *http.Request + cfg *config.Server + expErr error + expValid validatedCheckin }{ { name: "Invalid JSON", @@ -1206,121 +1152,6 @@ func TestValidateCheckinRequest(t *testing.T) { rawMeta: []byte(`{"elastic": {"agent": {"id": "testid", "fips": true}}}`), }, }, - { - name: "local metadata matches", - req: &http.Request{ - Body: io.NopCloser(strings.NewReader(`{"status": "online", "message": "test message", "local_metadata": {"elastic": {"agent": {"id": "testid", "fips": true}}}}`)), - }, - expErr: nil, - currentMeta: json.RawMessage(`{"elastic": {"agent": {"id": "testid", "fips": true}}}`), - cfg: &config.Server{ - Limits: config.ServerLimits{ - CheckinLimit: config.Limit{ - MaxBody: 0, - }, - }, - }, - expValid: validatedCheckin{ - rawMeta: nil, // no need to update - }, - }, - { - name: "local metadata different JSON formatting", - req: &http.Request{ - // JSON with specific key ordering - Body: io.NopCloser(strings.NewReader(`{"status": "online", "message": "test message", "local_metadata": {"elastic": {"agent": {"id": "testid", "version": "8.0.0"}}, "host": {"hostname": "test-host"}}}`)), - }, - expErr: nil, - // Same content but different key ordering in JSON - when unmarshaled and compared - // with reflect.DeepEqual they should be equal, but raw bytes are different - currentMeta: json.RawMessage(`{"host":{"hostname":"test-host"},"elastic":{"agent":{"version":"8.0.0","id":"testid"}}}`), - cfg: &config.Server{ - Limits: config.ServerLimits{ - CheckinLimit: config.Limit{ - MaxBody: 0, - }, - }, - }, - expValid: validatedCheckin{ - rawMeta: nil, // should recognize as same content despite different formatting - }, - }, - { - name: "local metadata is empty string and agent has nil", - req: &http.Request{ - Body: io.NopCloser(strings.NewReader(`{"status": "online", "message": "test message", "local_metadata": ""}`)), - }, - expErr: nil, - currentMeta: nil, - cfg: &config.Server{ - Limits: config.ServerLimits{ - CheckinLimit: config.Limit{ - MaxBody: 0, - }, - }, - }, - expValid: validatedCheckin{ - // don't update metadata - rawMeta: nil, - }, - }, - { - name: "local metadata is empty string and agent has different value", - req: &http.Request{ - Body: io.NopCloser(strings.NewReader(`{"status": "online", "message": "test message", "local_metadata": ""}`)), - }, - expErr: nil, - currentMeta: json.RawMessage(`{"host": {"hostname": "test-host"}}`), - cfg: &config.Server{ - Limits: config.ServerLimits{ - CheckinLimit: config.Limit{ - MaxBody: 0, - }, - }, - }, - expValid: validatedCheckin{ - // don't update metadata - rawMeta: nil, - }, - }, - { - name: "local metadata is null and agent has nil", - req: &http.Request{ - Body: io.NopCloser(strings.NewReader(`{"status": "online", "message": "test message", "local_metadata": null}`)), - }, - expErr: nil, - currentMeta: nil, - cfg: &config.Server{ - Limits: config.ServerLimits{ - CheckinLimit: config.Limit{ - MaxBody: 0, - }, - }, - }, - expValid: validatedCheckin{ - // don't update metadata - rawMeta: nil, - }, - }, - { - name: "local metadata is null and agent has existing metadata", - req: &http.Request{ - Body: io.NopCloser(strings.NewReader(`{"status": "online", "message": "test message", "local_metadata": null}`)), - }, - expErr: nil, - currentMeta: json.RawMessage(`{"host": {"hostname": "test-host"}}`), - cfg: &config.Server{ - Limits: config.ServerLimits{ - CheckinLimit: config.Limit{ - MaxBody: 0, - }, - }, - }, - expValid: validatedCheckin{ - // don't update metadata - rawMeta: nil, - }, - }, } for _, tc := range tests { @@ -1329,7 +1160,7 @@ func TestValidateCheckinRequest(t *testing.T) { assert.NoError(t, err) wr := httptest.NewRecorder() logger := testlog.SetLogger(t) - valid, err := checkin.validateRequest(logger, wr, tc.req, time.Time{}, &model.Agent{LocalMetadata: tc.currentMeta}) + valid, err := checkin.validateRequest(logger, wr, tc.req, time.Time{}, &model.Agent{LocalMetadata: json.RawMessage(`{}`)}) if tc.expErr == nil { assert.NoError(t, err) assert.Equal(t, tc.expValid.rawMeta, valid.rawMeta) @@ -1345,274 +1176,206 @@ func TestValidateCheckinRequest(t *testing.T) { } func TestProcessPolicyDetails(t *testing.T) { - esd := model.ESDocument{Id: "doc-ID"} policyID := "policy-id" revIDX2 := int64(2) tests := []struct { - name string - agent *model.Agent - req *CheckinRequest - policyID string - revIdx int64 - bulk func() *ftesting.MockBulk - ignoreCheckin bool + name string + agent *model.Agent + req *CheckinRequest + getPolicyMonitor func() *mockPolicyMonitor + revIDX int64 + returnsOpts bool + err error }{{ name: "request has no policy details", agent: &model.Agent{ - ESDocument: esd, - PolicyID: policyID, PolicyRevisionIdx: 1, - Outputs: map[string]*model.PolicyOutput{ - "default": &model.PolicyOutput{ - APIKey: "123", - }, - }, }, - req: &CheckinRequest{}, - policyID: policyID, - revIdx: 1, - bulk: func() *ftesting.MockBulk { - mBulk := ftesting.NewMockBulk() - noPolicyDetailsMockCheck(t, mBulk) - return mBulk + req: &CheckinRequest{}, + getPolicyMonitor: func() *mockPolicyMonitor { + return &mockPolicyMonitor{} }, + revIDX: 1, + returnsOpts: false, + err: nil, }, { name: "policy reassign detected", agent: &model.Agent{ - ESDocument: esd, + Agent: &model.AgentMetadata{ + ID: "agent-id", + }, PolicyID: "new-policy-id", AgentPolicyID: policyID, PolicyRevisionIdx: 2, - Outputs: map[string]*model.PolicyOutput{ - "default": &model.PolicyOutput{ - APIKey: "123", - }, - }, }, req: &CheckinRequest{ AgentPolicyId: &policyID, PolicyRevisionIdx: &revIDX2, }, - policyID: policyID, - revIdx: 0, - bulk: func() *ftesting.MockBulk { - mBulk := ftesting.NewMockBulk() - policyDetailsMockCheck(t, mBulk, policyID, revIDX2) - return mBulk + getPolicyMonitor: func() *mockPolicyMonitor { + return &mockPolicyMonitor{} }, + revIDX: 0, + returnsOpts: false, + err: nil, }, { - name: "no outputs", + name: "revision updated", agent: &model.Agent{ - ESDocument: esd, + Agent: &model.AgentMetadata{ + ID: "agent-id", + }, PolicyID: policyID, AgentPolicyID: policyID, - PolicyRevisionIdx: 2, - }, - req: &CheckinRequest{}, - policyID: policyID, - revIdx: 0, - bulk: func() *ftesting.MockBulk { - mBulk := ftesting.NewMockBulk() - noPolicyDetailsMockCheck(t, mBulk) - return mBulk + PolicyRevisionIdx: 1, }, - }, { - name: "missing output APIKey", - agent: &model.Agent{ - ESDocument: esd, - PolicyID: policyID, - AgentPolicyID: policyID, - PolicyRevisionIdx: 2, - Outputs: map[string]*model.PolicyOutput{ - "default": &model.PolicyOutput{ - APIKey: "123", - }, - "remote": &model.PolicyOutput{}, - }, + req: &CheckinRequest{ + AgentPolicyId: &policyID, + PolicyRevisionIdx: &revIDX2, }, - req: &CheckinRequest{}, - policyID: policyID, - revIdx: 0, - bulk: func() *ftesting.MockBulk { - mBulk := ftesting.NewMockBulk() - noPolicyDetailsMockCheck(t, mBulk) - return mBulk + getPolicyMonitor: func() *mockPolicyMonitor { + pm := &mockPolicyMonitor{} + pm.On("LatestRev", mock.Anything, policyID).Return(int64(2)).Once() + return pm }, + revIDX: 2, + returnsOpts: true, + err: nil, }, { - name: "revision updated", + name: "checkin revision is greater than the policy's latest revision", agent: &model.Agent{ - ESDocument: esd, + Agent: &model.AgentMetadata{ + ID: "agent-id", + }, PolicyID: policyID, AgentPolicyID: policyID, PolicyRevisionIdx: 1, - Outputs: map[string]*model.PolicyOutput{ - "default": &model.PolicyOutput{ - APIKey: "123", - }, - }, }, req: &CheckinRequest{ AgentPolicyId: &policyID, PolicyRevisionIdx: &revIDX2, }, - policyID: policyID, - revIdx: revIDX2, - bulk: func() *ftesting.MockBulk { - mBulk := ftesting.NewMockBulk() - policyDetailsMockCheck(t, mBulk, policyID, revIDX2) - return mBulk + getPolicyMonitor: func() *mockPolicyMonitor { + pm := &mockPolicyMonitor{} + pm.On("LatestRev", mock.Anything, policyID).Return(int64(1)).Once() + return pm }, + revIDX: 0, + returnsOpts: true, + err: nil, }, { - name: "agent does not have agent_policy_id present", + name: "agent_policy_id has changed", agent: &model.Agent{ - ESDocument: esd, - PolicyID: policyID, - PolicyRevisionIdx: 2, - Outputs: map[string]*model.PolicyOutput{ - "default": &model.PolicyOutput{ - APIKey: "123", - }, + Agent: &model.AgentMetadata{ + ID: "agent-id", }, + PolicyID: policyID, + AgentPolicyID: "old-policy-id", + PolicyRevisionIdx: 1, }, req: &CheckinRequest{ AgentPolicyId: &policyID, PolicyRevisionIdx: &revIDX2, }, - policyID: policyID, - revIdx: revIDX2, - bulk: func() *ftesting.MockBulk { - mBulk := ftesting.NewMockBulk() - policyDetailsMockCheck(t, mBulk, policyID, revIDX2) - return mBulk + getPolicyMonitor: func() *mockPolicyMonitor { + pm := &mockPolicyMonitor{} + pm.On("LatestRev", mock.Anything, policyID).Return(int64(2)).Once() + return pm }, + revIDX: 2, + returnsOpts: true, + err: nil, }, { - name: "details present with no changes for agent doc", + name: "agent does not have agent_policy_id present", agent: &model.Agent{ - ESDocument: esd, - AgentPolicyID: policyID, - PolicyID: policyID, - PolicyRevisionIdx: revIDX2, - Outputs: map[string]*model.PolicyOutput{ - "default": &model.PolicyOutput{ - APIKey: "123", - }, + Agent: &model.AgentMetadata{ + ID: "agent-id", }, + PolicyID: policyID, + PolicyRevisionIdx: 2, }, req: &CheckinRequest{ AgentPolicyId: &policyID, PolicyRevisionIdx: &revIDX2, }, - policyID: policyID, - revIdx: revIDX2, - bulk: func() *ftesting.MockBulk { - mBulk := ftesting.NewMockBulk() - policyDetailsMockCheck(t, mBulk, policyID, revIDX2) - return mBulk + getPolicyMonitor: func() *mockPolicyMonitor { + pm := &mockPolicyMonitor{} + pm.On("LatestRev", mock.Anything, policyID).Return(int64(2)).Once() + return pm }, + revIDX: 2, + returnsOpts: true, + err: nil, }, { - name: "details present ignore checkin", + name: "details present with no changes from agent doc", agent: &model.Agent{ - ESDocument: esd, + Agent: &model.AgentMetadata{ + ID: "agent-id", + }, AgentPolicyID: policyID, PolicyID: policyID, PolicyRevisionIdx: revIDX2, - Outputs: map[string]*model.PolicyOutput{ - "default": &model.PolicyOutput{ - APIKey: "123", - }, - }, }, req: &CheckinRequest{ AgentPolicyId: &policyID, PolicyRevisionIdx: &revIDX2, }, - policyID: policyID, - revIdx: revIDX2, - bulk: func() *ftesting.MockBulk { - mBulk := ftesting.NewMockBulk() - noPolicyDetailsMockCheck(t, mBulk) - return mBulk + getPolicyMonitor: func() *mockPolicyMonitor { + pm := &mockPolicyMonitor{} + pm.On("LatestRev", mock.Anything, policyID).Return(int64(2)).Once() + return pm }, - ignoreCheckin: true, + revIDX: 2, + returnsOpts: false, + err: nil, }} for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { logger := testlog.SetLogger(t) + pm := tc.getPolicyMonitor() + checkin := &CheckinT{ + cfg: &config.Server{}, + bulker: ftesting.NewMockBulk(), + pm: pm, + } - mBulk := tc.bulk() - bc := checkin.NewBulk(mBulk) - cfg := &config.Server{} - if tc.ignoreCheckin { - cfg.Features.IgnoreCheckinPolicyID = true + revIDX, opts, err := checkin.processPolicyDetails(t.Context(), logger, tc.agent, tc.req) + assert.Equal(t, tc.revIDX, revIDX) + if tc.returnsOpts { + assert.NotEmpty(t, opts) + } else { + assert.Empty(t, opts) } - ct := &CheckinT{ - bc: bc, - bulker: mBulk, - cfg: cfg, + if tc.err != nil { + assert.ErrorIs(t, tc.err, err) + } else { + assert.NoError(t, err) } - - opts := make([]checkin.Option, 0, 2) - opts, ePolicyID, eRevIdx, err := ct.processPolicyDetails(t.Context(), logger, tc.agent, tc.req, opts) - require.NoError(t, err) - assert.Equal(t, tc.policyID, ePolicyID) - assert.Equal(t, tc.revIdx, eRevIdx) - - err = bc.CheckIn(t.Context(), tc.agent.Id, opts...) - assert.NoError(t, err) - - mBulk.AssertExpectations(t) + pm.AssertExpectations(t) }) } -} -func noPolicyDetailsMockCheck(t *testing.T, mBulk *ftesting.MockBulk) { - mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.MatchedBy(func(p []byte) bool { - doc := struct { - Doc map[string]interface{} `json:"doc"` - }{} - if err := json.Unmarshal(p, &doc); err != nil { - t.Logf("bulk match unmarshal error: %v", err) - return false - } - _, noAgentPolicyID := doc.Doc[dl.FieldAgentPolicyID] - _, noPolicyRevisionIdx := doc.Doc[dl.FieldPolicyRevisionIdx] - return !noAgentPolicyID && !noPolicyRevisionIdx - }), mock.Anything, mock.Anything).Return(nil) -} - -func policyDetailsMockCheck(t *testing.T, mBulk *ftesting.MockBulk, policyID string, revIdx int64) { - mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.MatchedBy(func(p []byte) bool { - doc := struct { - Doc map[string]interface{} `json:"doc"` - }{} - if err := json.Unmarshal(p, &doc); err != nil { - t.Logf("bulk match unmarshal error: %v", err) - return false - } - oPolicyID, hasPolicy := doc.Doc[dl.FieldAgentPolicyID] - if !hasPolicy { - return false - } - oRevIdx, hasRevIdx := doc.Doc[dl.FieldPolicyRevisionIdx] - if !hasRevIdx { - return false - } - oRevIdxF, ok := oRevIdx.(float64) - if !ok { - return false - } - return oPolicyID == policyID && int64(oRevIdxF) == revIdx - }), mock.Anything, mock.Anything).Return(nil) -} - -func getFieldStringValue(doc map[string]interface{}, key string) string { - raw, ok := doc[key] - if ok { - s, ok := raw.(string) - if ok { - return s + t.Run("IgnoreCheckinPolicyID flag is set", func(t *testing.T) { + logger := testlog.SetLogger(t) + checkin := &CheckinT{ + cfg: &config.Server{ + Features: config.FeatureFlags{ + IgnoreCheckinPolicyID: true, + }, + }, } - } - return "" + revIDX, opts, err := checkin.processPolicyDetails(t.Context(), logger, + &model.Agent{ + PolicyID: policyID, + PolicyRevisionIdx: 1, + }, + &CheckinRequest{ + AgentPolicyId: &policyID, + PolicyRevisionIdx: &revIDX2, + }, + ) + assert.NoError(t, err) + assert.Equal(t, int64(1), revIDX) + assert.Empty(t, opts) + }) } diff --git a/internal/pkg/checkin/bulk.go b/internal/pkg/checkin/bulk.go index cfef6f3f29..a2304ffd85 100644 --- a/internal/pkg/checkin/bulk.go +++ b/internal/pkg/checkin/bulk.go @@ -9,6 +9,7 @@ import ( "context" _ "embed" "encoding/json" + "errors" "fmt" "sync" "time" @@ -41,225 +42,128 @@ func WithFlushInterval(d time.Duration) Opt { } // Option is the type for optional arguments for agent checkins. -type Option func(*checkinT) +type Option func(*pendingT) func WithStatus(status string) Option { - return func(state *checkinT) { - state.status.isSet = true - state.status.value = status + return func(pending *pendingT) { + pending.status = status } } func WithMessage(message string) Option { - return func(state *checkinT) { - state.message.isSet = true - state.message.value = message + return func(pending *pendingT) { + pending.message = message } } func WithUnhealthyReason(reason *[]string) Option { - return func(state *checkinT) { - state.unhealthyReason.isSet = true - state.unhealthyReason.value = reason + return func(pending *pendingT) { + pending.unhealthyReason = reason } } -func WithMeta(meta *json.RawMessage) Option { - return func(state *checkinT) { - state.meta.isSet = true - state.meta.value = meta +func WithMeta(meta []byte) Option { + return func(pending *pendingT) { + if pending.extra == nil { + pending.extra = &extraT{} + } + pending.extra.meta = meta } } func WithSeqNo(seqno sqn.SeqNo) Option { - return func(state *checkinT) { + return func(pending *pendingT) { if !seqno.IsSet() { return } - state.seqNo = seqno + if pending.extra == nil { + pending.extra = &extraT{} + } + pending.extra.seqNo = seqno } } func WithVer(ver string) Option { - return func(state *checkinT) { - state.ver.isSet = true - state.ver.value = ver + return func(pending *pendingT) { + if pending.extra == nil { + pending.extra = &extraT{} + } + pending.extra.ver = ver } } -func WithComponents(components *json.RawMessage) Option { - return func(state *checkinT) { - state.components.isSet = true - state.components.value = components +func WithComponents(components []byte) Option { + return func(pending *pendingT) { + if pending.extra == nil { + pending.extra = &extraT{} + } + pending.extra.components = components } } func WithDeleteAudit(del bool) Option { - return func(state *checkinT) { + return func(pending *pendingT) { if !del { return } - state.deleteAudit = del + if pending.extra == nil { + pending.extra = &extraT{} + } + pending.extra.deleteAudit = del } } func WithAgentPolicyID(id string) Option { - return func(state *checkinT) { - state.agentPolicyID.isSet = true - state.agentPolicyID.value = id + return func(pending *pendingT) { + pending.agentPolicyID = id } } func WithPolicyRevisionIDX(idx int64) Option { - return func(state *checkinT) { - state.revisionIDX.isSet = true - state.revisionIDX.value = idx - } -} - -func WithUpgradeDetails(details *json.RawMessage) Option { - return func(state *checkinT) { - state.upgradeDetails.isSet = true - state.upgradeDetails.value = details - } -} - -func WithUpgradeStartedAt(startedAt *string) Option { - return func(state *checkinT) { - state.upgradeStartedAt.isSet = true - state.upgradeStartedAt.value = startedAt - } -} - -func WithUpgradeStatus(status *string) Option { - return func(state *checkinT) { - state.upgradeStatus.isSet = true - state.upgradeStatus.value = status + return func(pending *pendingT) { + pending.revisionIDX = idx } } -func WithUpgradeAttempts(attempts *[]string) Option { - return func(state *checkinT) { - state.upgradeAttempts.isSet = true - state.upgradeAttempts.value = attempts - } -} - -func WithUpgradedAt(upgradedAt *string) Option { - return func(state *checkinT) { - state.upgradedAt.isSet = true - state.upgradedAt.value = upgradedAt - } -} - -type setT[T any] struct { - isSet bool - value T -} - -type checkinT struct { - ts string - seqNo sqn.SeqNo - - ver setT[string] - status setT[string] - message setT[string] - unhealthyReason setT[*[]string] - - agentPolicyID setT[string] - revisionIDX setT[int64] - - meta setT[*json.RawMessage] - components setT[*json.RawMessage] - - upgradeDetails setT[*json.RawMessage] - upgradeStartedAt setT[*string] - upgradeStatus setT[*string] - upgradeAttempts setT[*[]string] - upgradedAt setT[*string] - +type extraT struct { + meta []byte + seqNo sqn.SeqNo + ver string + components []byte deleteAudit bool } -func (c *checkinT) toBody() ([]byte, error) { - fields := bulk.UpdateFields{ - dl.FieldUpdatedAt: c.ts, - dl.FieldLastCheckin: c.ts, - } - if c.unhealthyReason.isSet { - fields[dl.FieldUnhealthyReason] = c.unhealthyReason.value - } - if c.status.isSet { - fields[dl.FieldLastCheckinStatus] = c.status.value - } - if c.message.isSet { - fields[dl.FieldLastCheckinMessage] = c.message.value - } - if c.agentPolicyID.isSet { - fields[dl.FieldAgentPolicyID] = c.agentPolicyID.value - } - if c.revisionIDX.isSet { - fields[dl.FieldPolicyRevisionIdx] = c.revisionIDX.value - } - // If the agent version is not empty it needs to be updated - // Assuming the agent can be upgraded keeping the same id, but incrementing the version - if c.ver.isSet && c.ver.value != "" { - fields[dl.FieldAgent] = map[string]interface{}{ - dl.FieldAgentVersion: c.ver, - } - } - - // Update local metadata if provided (and has a value) - if c.meta.isSet { - // Surprise: The json encoder compacts this raw JSON during - // the encode process, so there my be unexpected memory overhead: - // https://github.com/golang/go/blob/de5d7eccb99088e3ab42c0d907da6852d8f9cebe/src/encoding/json/encode.go#L503-L507 - fields[dl.FieldLocalMetadata] = c.meta.value - } - - // Update components if provided (and has a value) - if c.components.isSet { - fields[dl.FieldComponents] = c.components.value - } - - if c.upgradeDetails.isSet { - fields[dl.FieldUpgradeDetails] = c.upgradeDetails.value - } - if c.upgradeStartedAt.isSet { - fields[dl.FieldUpgradeStartedAt] = c.upgradeStartedAt.value - } - if c.upgradeStatus.isSet { - fields[dl.FieldUpgradeStatus] = c.upgradeStatus.value - } - if c.upgradeAttempts.isSet { - fields[dl.FieldUpgradeAttempts] = c.upgradeAttempts.value - } - if c.upgradedAt.isSet { - fields[dl.FieldUpgradedAt] = c.upgradedAt.value - } - - // If seqNo changed, set the field appropriately - if c.seqNo.IsSet() { - fields[dl.FieldActionSeqNo] = c.seqNo - } - return fields.Marshal() +// Minimize the size of this structure. +// There will be 10's of thousands of items +// in the map at any point. +type pendingT struct { + ts string + status string + message string + agentPolicyID string // may be empty + revisionIDX int64 + extra *extraT + unhealthyReason *[]string } -// Bulk handles checkins and keeps connected agents updated_at timestamp up-to-date. +// Bulk will batch pending checkins and update elasticsearch at a set interval. type Bulk struct { - opts optionsT - bulker bulk.Bulk - mut sync.RWMutex - connected map[string]struct{} + opts optionsT + bulker bulk.Bulk + mut sync.Mutex + pending map[string]pendingT + + ts string + unix int64 } func NewBulk(bulker bulk.Bulk, opts ...Opt) *Bulk { parsedOpts := parseOpts(opts...) return &Bulk{ - opts: parsedOpts, - bulker: bulker, - connected: make(map[string]struct{}), + opts: parsedOpts, + bulker: bulker, + pending: make(map[string]pendingT), } } @@ -275,81 +179,35 @@ func parseOpts(opts ...Opt) optionsT { return outOpts } -// Add adds the agent to the connected list. -func (bc *Bulk) Add(id string) { - bc.mut.Lock() - defer bc.mut.Unlock() - bc.connected[id] = struct{}{} -} +// Generate and cache timestamp on seconds change. +// Avoid thousands of formats of an identical string. +func (bc *Bulk) timestamp() string { + // WARNING: Expects mutex locked. + now := time.Now() + if now.Unix() != bc.unix { + bc.unix = now.Unix() + bc.ts = now.UTC().Format(time.RFC3339) + } -// Remove removes the agent from the connected list. -func (bc *Bulk) Remove(id string) { - bc.mut.Lock() - defer bc.mut.Unlock() - delete(bc.connected, id) + return bc.ts } -// CheckIn records that the agent has checked-in. -// -// This does not call `Add`, the caller should also call that to record that the agent is connected so that bulk -// can keep the documents `updated_at` fields updated. -// WARNING: CheckIn will take ownership of fields, so do not use after passing in. -func (bc *Bulk) CheckIn(ctx context.Context, id string, opts ...Option) error { - now := time.Now().UTC().Format(time.RFC3339) - checkin := checkinT{ - ts: now, - } - for _, opt := range opts { - opt(&checkin) - } - - // update the agent document - body, err := checkin.toBody() - if err != nil { - return fmt.Errorf("could not marshall update body: %w", err) - } - bulkOpts := []bulk.Opt{bulk.WithRetryOnConflict(3)} - if checkin.seqNo.IsSet() { - bulkOpts = append(bulkOpts, bulk.WithRefresh()) +// CheckIn will add the agent (identified by id) to the pending set. +// The pending agents are sent to elasticsearch as a bulk update at each flush interval. +// NOTE: If Checkin is called after Run has returned it will just add the entry to the pending map and not do any operations, this may occur when the fleet-server is shutting down. +// WARNING: Bulk will take ownership of fields, so do not use after passing in. +func (bc *Bulk) CheckIn(id string, opts ...Option) error { + bc.mut.Lock() + pending := pendingT{ + ts: bc.timestamp(), } - // no deleteAudit so a simple update will work - if !checkin.deleteAudit { - err = bc.bulker.Update(ctx, dl.FleetAgents, id, body, bulkOpts...) - if err != nil { - return fmt.Errorf("failed to update document: %w", err) - } - return nil + for _, opt := range opts { + opt(&pending) } - // deleteAudit requires the usage of MUpdate to allow updating the body and then removing the needed fields. - action := &estypes.UpdateAction{ - Script: &estypes.Script{ - Lang: &scriptlanguage.Painless, - Source: &deleteAuditAttributesScript, - Options: map[string]string{}, - }, - } - actionBody, err := json.Marshal(&action) - if err != nil { - return fmt.Errorf("could not marshall script action: %w", err) - } - updates := []bulk.MultiOp{ - { - ID: id, - Index: dl.FleetAgents, - Body: body, - }, - { - ID: id, - Index: dl.FleetAgents, - Body: actionBody, - }, - } - _, err = bc.bulker.MUpdate(ctx, updates, bulkOpts...) - if err != nil { - return fmt.Errorf("failed update document and remove audit fields: %w", err) - } + bc.pending[id] = pending + bc.mut.Unlock() return nil } @@ -361,7 +219,7 @@ func (bc *Bulk) Run(ctx context.Context) error { for { select { case <-tick.C: - if err := bc.flushConnected(ctx); err != nil { + if err := bc.flush(ctx); err != nil { zerolog.Ctx(ctx).Error().Err(err).Msg("Eat bulk checkin error; Keep on truckin'") } @@ -371,30 +229,199 @@ func (bc *Bulk) Run(ctx context.Context) error { } } -// flushConnected does an update that updates all the updated_at fields on all currently -// connected agents. -func (bc *Bulk) flushConnected(ctx context.Context) error { - nowTimestamp := time.Now().UTC().Format(time.RFC3339) - fields := bulk.UpdateFields{ - dl.FieldUpdatedAt: nowTimestamp, - } - body, err := fields.Marshal() - if err != nil { - return fmt.Errorf("marshal updated_at field error: %w", err) +// flush sends the minium data needed to update records in elasticsearch. +func (bc *Bulk) flush(ctx context.Context) error { + start := time.Now() + + bc.mut.Lock() + pending := bc.pending + bc.pending = make(map[string]pendingT, len(pending)) + bc.mut.Unlock() + + if len(pending) == 0 { + return nil } - bc.mut.RLock() - updates := make([]bulk.MultiOp, 0, len(bc.connected)) - for id := range bc.connected { + + updates := make([]bulk.MultiOp, 0, len(pending)) + + simpleCache := make(map[pendingT][]byte) + + nowTimestamp := start.UTC().Format(time.RFC3339) + + var err error + var needRefresh bool + for id, pendingData := range pending { + var body []byte + if pendingData.extra == nil { + // agents that checkin without extra attributes are cachable + // Cacheable agents can share the same status, message, and unhealthy reason. Timestamps are ignored. + // This prevents an extra JSON serialization when agents have the same update body. + var ok bool + body, ok = simpleCache[pendingData] + if !ok { + body, err = toUpdateBody(nowTimestamp, pendingData) + if err != nil { + return err + } + simpleCache[pendingData] = body + } + } else if pendingData.extra.deleteAudit { + if pendingData.extra.seqNo.IsSet() { + needRefresh = true + } + // Use a script instead of a partial doc to update if attributes need to be removed + params, err := encodeParams(nowTimestamp, pendingData) + if err != nil { + return fmt.Errorf("unable to parse checkin details as params: %w", err) + } + action := &estypes.UpdateAction{ + Script: &estypes.Script{ + Lang: &scriptlanguage.Painless, + Source: &deleteAuditAttributesScript, + Options: map[string]string{}, + Params: params, + }, + } + body, err = json.Marshal(&action) + if err != nil { + return fmt.Errorf("could not marshall script action: %w", err) + } + } else { + if pendingData.extra.seqNo.IsSet() { + needRefresh = true + } + body, err = toUpdateBody(nowTimestamp, pendingData) + if err != nil { + return err + } + } + updates = append(updates, bulk.MultiOp{ ID: id, Body: body, Index: dl.FleetAgents, }) } - bc.mut.RUnlock() - _, err = bc.bulker.MUpdate(ctx, updates) - if err != nil { - return fmt.Errorf("mupdate error: %w", err) + + var opts []bulk.Opt + if needRefresh { + opts = append(opts, bulk.WithRefresh()) } - return nil + + _, err = bc.bulker.MUpdate(ctx, updates, opts...) + + zerolog.Ctx(ctx).Trace(). + Err(err). + Dur("rtt", time.Since(start)). + Int("cnt", len(updates)). + Bool("refresh", needRefresh). + Msg("Flush updates") + + return err +} + +func toUpdateBody(now string, pending pendingT) ([]byte, error) { + fields := bulk.UpdateFields{ + dl.FieldUpdatedAt: now, // Set "updated_at" to the current timestamp + dl.FieldLastCheckin: pending.ts, // Set the checkin timestamp + dl.FieldLastCheckinStatus: pending.status, // Set the pending status + dl.FieldLastCheckinMessage: pending.message, // Set the status message + dl.FieldUnhealthyReason: pending.unhealthyReason, + } + if pending.agentPolicyID != "" { + fields[dl.FieldAgentPolicyID] = pending.agentPolicyID + fields[dl.FieldPolicyRevisionIdx] = pending.revisionIDX + } + if pending.extra != nil { + // If the agent version is not empty it needs to be updated + // Assuming the agent can by upgraded keeping the same id, but incrementing the version + if pending.extra.ver != "" { + fields[dl.FieldAgent] = map[string]interface{}{ + dl.FieldAgentVersion: pending.extra.ver, + } + } + + // Update local metadata if provided + if pending.extra.meta != nil { + // Surprise: The json encoder compacts this raw JSON during + // the encode process, so there my be unexpected memory overhead: + // https://github.com/golang/go/blob/de5d7eccb99088e3ab42c0d907da6852d8f9cebe/src/encoding/json/encode.go#L503-L507 + fields[dl.FieldLocalMetadata] = json.RawMessage(pending.extra.meta) + } + + // Update components if provided + if pending.extra.components != nil { + fields[dl.FieldComponents] = json.RawMessage(pending.extra.components) + } + + // If seqNo changed, set the field appropriately + if pending.extra.seqNo.IsSet() { + fields[dl.FieldActionSeqNo] = pending.extra.seqNo + } + } + return fields.Marshal() +} + +func encodeParams(now string, data pendingT) (map[string]json.RawMessage, error) { + var ( + tsNow json.RawMessage + ts json.RawMessage + status json.RawMessage + message json.RawMessage + reason json.RawMessage + + // optional attributes below + policyID json.RawMessage + revisionIDX json.RawMessage + ver json.RawMessage + meta json.RawMessage + components json.RawMessage + isSet json.RawMessage + seqNo json.RawMessage + + err error + ) + tsNow, err = json.Marshal(now) + Err := errors.Join(err) + ts, err = json.Marshal(data.ts) + Err = errors.Join(Err, err) + status, err = json.Marshal(data.status) + Err = errors.Join(Err, err) + message, err = json.Marshal(data.message) + Err = errors.Join(Err, err) + reason, err = json.Marshal(data.unhealthyReason) + Err = errors.Join(Err, err) + policyID, err = json.Marshal(data.agentPolicyID) + Err = errors.Join(Err, err) + revisionIDX, err = json.Marshal(data.revisionIDX) + Err = errors.Join(Err, err) + ver, err = json.Marshal(data.extra.ver) + Err = errors.Join(Err, err) + isSet, err = json.Marshal(data.extra.seqNo.IsSet()) + Err = errors.Join(Err, err) + seqNo, err = json.Marshal(data.extra.seqNo) + Err = errors.Join(Err, err) + if data.extra.meta != nil { + meta = data.extra.meta + } + if data.extra.components != nil { + components = data.extra.components + } + if Err != nil { + return nil, Err + } + return map[string]json.RawMessage{ + "Now": tsNow, + "TS": ts, + "Status": status, + "Message": message, + "UnhealthyReason": reason, + "PolicyID": policyID, + "RevisionIDX": revisionIDX, + "Ver": ver, + "Meta": meta, + "Components": components, + "SeqNoSet": isSet, + "SeqNo": seqNo, + }, nil } diff --git a/internal/pkg/checkin/bulk_test.go b/internal/pkg/checkin/bulk_test.go index e6415b1c5e..d0cb344d54 100644 --- a/internal/pkg/checkin/bulk_test.go +++ b/internal/pkg/checkin/bulk_test.go @@ -8,14 +8,13 @@ import ( "context" "encoding/json" "testing" + "time" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/dl" "github.com/elastic/fleet-server/v7/internal/pkg/sqn" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" - estypes "github.com/elastic/go-elasticsearch/v8/typedapi/types" - "github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/scriptlanguage" "github.com/google/go-cmp/cmp" "github.com/rs/xid" @@ -29,8 +28,21 @@ import ( // Test with seq no // matchOp is used with mock.MatchedBy to match and validate the operation -func matchOp(tb testing.TB, c testcase) func([]byte) bool { - return func(body []byte) bool { +func matchOp(tb testing.TB, c testcase, ts time.Time) func(ops []bulk.MultiOp) bool { + return func(ops []bulk.MultiOp) bool { + if len(ops) != 1 { + return false + } + if ops[0].ID != c.id { + return false + } + if ops[0].Index != dl.FleetAgents { + return false + } + tb.Log("Operation match! validating details...") + + // Decode and match operation + // NOTE putting the extra validation here seems strange, maybe we should read the args in the test body intstead? type updateT struct { LastCheckin string `json:"last_checkin"` Status string `json:"last_checkin_status"` @@ -42,14 +54,14 @@ func matchOp(tb testing.TB, c testcase) func([]byte) bool { } m := make(map[string]updateT) - err := json.Unmarshal(body, &m) - require.NoErrorf(tb, err, "unable to validate operation body %s", string(body)) + err := json.Unmarshal(ops[0].Body, &m) + require.NoErrorf(tb, err, "unable to validate operation body %s", string(ops[0].Body)) sub, ok := m["doc"] require.True(tb, ok, "unable to validate operation: expected doc") - assert.NotEmpty(tb, sub.LastCheckin) - assert.Equal(tb, sub.LastCheckin, sub.UpdatedAt) // should have same timestamp + validateTimestamp(tb, ts.Truncate(time.Second), sub.LastCheckin) + validateTimestamp(tb, ts.Truncate(time.Second), sub.UpdatedAt) assert.Equal(tb, c.policyID, sub.AgentPolicyID) assert.Equal(tb, c.revisionIDX, sub.RevisionIDX) if c.seqno != nil { @@ -59,38 +71,13 @@ func matchOp(tb testing.TB, c testcase) func([]byte) bool { } if c.meta != nil { - assert.Equal(tb, c.meta, sub.Meta) + assert.Equal(tb, json.RawMessage(c.meta), sub.Meta) } assert.Equal(tb, c.status, sub.Status) return true } } -// matchMOp is used with mock.MatchedBy to match and validate a multi update operation -func matchMOp(tb testing.TB, c testcase) func([]bulk.MultiOp) bool { - return func(updates []bulk.MultiOp) bool { - require.Len(tb, updates, 2) - - // first is the same body as matchOp checks - assert.Equal(tb, c.id, updates[0].ID) - assert.Equal(tb, dl.FleetAgents, updates[0].Index) - assert.True(tb, matchOp(tb, c)(updates[0].Body)) - - // second is the painless script to remove the audit fields - assert.Equal(tb, c.id, updates[1].ID) - assert.Equal(tb, dl.FleetAgents, updates[1].Index) - - var action estypes.UpdateAction - err := json.Unmarshal(updates[1].Body, &action) - require.NoError(tb, err, "second update body not an update action") - require.NotNil(tb, action.Script) - assert.Equal(tb, &scriptlanguage.Painless, action.Script.Lang) - assert.Equal(tb, &deleteAuditAttributesScript, action.Script.Source) - - return true - } -} - type testcase struct { name string id string @@ -98,15 +85,16 @@ type testcase struct { message string policyID string revisionIDX int64 - meta json.RawMessage - components json.RawMessage + meta []byte + components []byte seqno sqn.SeqNo ver string unhealthyReason *[]string - deleteAudit bool } -func TestBulkCheckin(t *testing.T) { +func TestBulkSimple(t *testing.T) { + start := time.Now() + const ver = "8.9.0" cases := []testcase{{ name: "Simple case", @@ -141,15 +129,6 @@ func TestBulkCheckin(t *testing.T) { meta: []byte(`{"hey":"now","brown":"cow"}`), components: []byte(`[{"id":"winlog-default","type":"winlog"}]`), ver: ver, - }, { - name: "Multi field case w/ delete audit", - id: "multiFieldId", - status: "online", - message: "message", - meta: []byte(`{"hey":"now","brown":"cow"}`), - components: []byte(`[{"id":"winlog-default","type":"winlog"}]`), - ver: ver, - deleteAudit: true, }, { name: "Multi field nested case", id: "multiFieldNestedId", @@ -189,22 +168,18 @@ func TestBulkCheckin(t *testing.T) { t.Run(c.name, func(t *testing.T) { ctx := testlog.SetLogger(t).WithContext(t.Context()) mockBulk := ftesting.NewMockBulk() - if c.deleteAudit { - mockBulk.On("MUpdate", mock.Anything, mock.MatchedBy(matchMOp(t, c)), mock.Anything).Return([]bulk.BulkIndexerResponseItem{}, nil).Once() - } else { - mockBulk.On("Update", mock.Anything, dl.FleetAgents, c.id, mock.MatchedBy(matchOp(t, c)), mock.Anything).Return(nil).Once() - } + mockBulk.On("MUpdate", mock.Anything, mock.MatchedBy(matchOp(t, c, start)), mock.Anything).Return([]bulk.BulkIndexerResponseItem{}, nil).Once() bc := NewBulk(mockBulk) opts := []Option{WithStatus(c.status), WithMessage(c.message)} if c.policyID != "" { opts = append(opts, WithAgentPolicyID(c.policyID), WithPolicyRevisionIDX(c.revisionIDX)) } - if len(c.meta) > 0 { - opts = append(opts, WithMeta(&c.meta)) + if c.meta != nil { + opts = append(opts, WithMeta(c.meta)) } - if len(c.components) > 0 { - opts = append(opts, WithComponents(&c.components)) + if c.components != nil { + opts = append(opts, WithComponents(c.components)) } if c.seqno != nil { opts = append(opts, WithSeqNo(c.seqno)) @@ -215,11 +190,10 @@ func TestBulkCheckin(t *testing.T) { if c.unhealthyReason != nil { opts = append(opts, WithUnhealthyReason(c.unhealthyReason)) } - if c.deleteAudit { - opts = append(opts, WithDeleteAudit(true)) - } - err := bc.CheckIn(ctx, c.id, opts...) + err := bc.CheckIn(c.id, opts...) + require.NoError(t, err) + err = bc.flush(ctx) require.NoError(t, err) mockBulk.AssertExpectations(t) @@ -227,6 +201,12 @@ func TestBulkCheckin(t *testing.T) { } } +func validateTimestamp(tb testing.TB, start time.Time, ts string) { + t1, err := time.Parse(time.RFC3339, ts) + require.NoErrorf(tb, err, "expected %q to be in RFC 3339 format", ts) + require.False(tb, start.After(t1), "timestamp in the past") +} + func benchmarkBulk(n int, b *testing.B) { mockBulk := ftesting.NewMockBulk() bc := NewBulk(mockBulk) @@ -241,7 +221,10 @@ func benchmarkBulk(n int, b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { for _, id := range ids { - bc.Add(id) + err := bc.CheckIn(id) + if err != nil { + b.Fatal(err) + } } } } @@ -264,11 +247,14 @@ func benchmarkFlush(n int, b *testing.B) { for i := 0; i < b.N; i++ { b.StopTimer() for _, id := range ids { - bc.Add(id) + err := bc.CheckIn(id) // TODO ths benchmark is not very interesting as the simplecache is used + if err != nil { + b.Fatal(err) + } } b.StartTimer() - err := bc.flushConnected(ctx) + err := bc.flush(ctx) if err != nil { b.Fatal(err) } diff --git a/internal/pkg/checkin/deleteAuditFieldsOnCheckin.painless b/internal/pkg/checkin/deleteAuditFieldsOnCheckin.painless index 8774fee674..3b0f111865 100644 --- a/internal/pkg/checkin/deleteAuditFieldsOnCheckin.painless +++ b/internal/pkg/checkin/deleteAuditFieldsOnCheckin.painless @@ -1,3 +1,24 @@ +ctx._source.last_checkin = params.TS; +ctx._source.updated_at = params.Now; +ctx._source.last_checkin_status = params.Status; +ctx._source.last_checkin_message = params.Message; +ctx._source.unhealthy_reason = params.UnhealthyReason; +if (params.PolicyID != "") { + ctx._source.agent_policy_id = params.PolicyID; + ctx._source.policy_revision_idx = params.RevisionIDX; +} +if (params.Ver != "") { + ctx._source.agent.version = params.Ver; +} +if (params.Meta != null) { + ctx._source.local_metadata = params.Meta; +} +if (params.Components != null) { + ctx._source.components = params.Components; +} +if (params.SeqNoSet) { + ctx._source.action_seq_no = params.SeqNo; +} ctx._source.remove('audit_unenrolled_reason'); ctx._source.remove('audit_unenrolled_time'); ctx._source.remove('unenrolled_at'); // audit/unenroll no longer sets this but it did in the 8.16.x releases, so we need to clear it. diff --git a/internal/pkg/policy/sub.go b/internal/pkg/policy/sub.go index 31f8830bc8..e767c35f29 100644 --- a/internal/pkg/policy/sub.go +++ b/internal/pkg/policy/sub.go @@ -77,7 +77,7 @@ func (n *subT) isEmpty() bool { func (n *subT) isUpdate(policy *model.Policy) bool { pRevIdx := policy.RevisionIdx - return pRevIdx != n.revIdx + return pRevIdx > n.revIdx } // Output returns a new policy that needs to be sent based on the current subscription. diff --git a/internal/pkg/server/fleet.go b/internal/pkg/server/fleet.go index 6d5b155f55..ef8aeb29b8 100644 --- a/internal/pkg/server/fleet.go +++ b/internal/pkg/server/fleet.go @@ -521,7 +521,7 @@ func (f *Fleet) runSubsystems(ctx context.Context, cfg *config.Config, g *errgro ad := action.NewDispatcher(am, cfg.Inputs[0].Server.Limits.ActionLimit.Interval, cfg.Inputs[0].Server.Limits.ActionLimit.Burst, bulker) g.Go(loggedRunFunc(ctx, "Action dispatcher", ad.Run)) - bc := checkin.NewBulk(bulker, checkin.WithFlushInterval(cfg.Inputs[0].Server.Timeouts.CheckinTimestamp)) + bc := checkin.NewBulk(bulker) g.Go(loggedRunFunc(ctx, "Bulk checkin", bc.Run)) ct, err := api.NewCheckinT(f.verCon, &cfg.Inputs[0].Server, f.cache, bc, pm, am, ad, bulker) diff --git a/internal/pkg/server/fleet_integration_test.go b/internal/pkg/server/fleet_integration_test.go index 36f565bbcc..7bac90de52 100644 --- a/internal/pkg/server/fleet_integration_test.go +++ b/internal/pkg/server/fleet_integration_test.go @@ -1685,7 +1685,7 @@ func Test_SmokeTest_AuditUnenroll(t *testing.T) { _, ok = obj["audit_unenrolled_reason"] _, ok2 := obj["unenrolled_at"] return !ok && !ok2 - }, time.Second*10, time.Second, "agent document should not have the audit_unenrolled_reason or unenrolled_at attributes. agent doc: %v", obj) + }, time.Second*20, time.Second, "agent document should not have the audit_unenrolled_reason or unenrolled_at attributes. agent doc: %v", obj) cancel() srv.waitExit() //nolint:errcheck // test case } diff --git a/testing/e2e/api_version/client_api_current.go b/testing/e2e/api_version/client_api_current.go index 459cc075d1..7ed80cccdf 100644 --- a/testing/e2e/api_version/client_api_current.go +++ b/testing/e2e/api_version/client_api_current.go @@ -465,7 +465,7 @@ func (tester *ClientAPITester) TestEnrollAuditUnenroll() { tester.Require().NoError(err) _, ok := obj.Source["audit_unenrolled_reason"] return !ok - }, time.Second*10, time.Second, "agent document in elasticsearch should not have audit_unenrolled_reason attribute") + }, time.Second*20, time.Second, "agent document in elasticsearch should not have audit_unenrolled_reason attribute") } // TestEnrollUpgradeAction_MetadataDownloadRate_String checks that download metadata rates can be sent as strings @@ -675,21 +675,15 @@ func (tester *ClientAPITester) TestCheckinWithPolicyIDRevision() { } } tester.Require().True(found, "unable to find POLICY_CHANGE action in 4th checkin response") - tester.Require().Equal(int64(iRev), int64(policyChange.Policy.Revision), "Expected POLICY_CHANGE action to be for updated policy revision") + revIDX = int64(policyChange.Policy.Revision) + tester.Require().Equal(int64(iRev), revIDX, "Expected POLICY_CHANGE action to be for updated policy revision") - // Document should reflect the current state of the Elastic Agent, which is always the revision_idx it used - // to check-in. Only an ACK or the next check-in that states the next revision would it be updated to the - // revision_idx that comes from the policy. tester.Require().EventuallyWithT(func(c *assert.CollectT) { agent := tester.GetAgent(ctx, agentID) require.Equal(c, policyID, agent.AgentPolicyID) require.Equal(c, revIDX, int64(agent.Revision)) }, time.Second*10, time.Second) - // Bump the revision_idx to the same as the policy to signal to Fleet that the Elastic Agent is now - // running the latest revision that it was sent. - revIDX = int64(policyChange.Policy.Revision) - // Do a normal checkin to "reset" to latest revision_idx // no actions are returned // Manage any API keys if present