diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index 286151c5251..c5923f1a25a 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -6,6 +6,7 @@ https://github.com/elastic/apm-server/compare/8.11\...main[View commits] [float] ==== Breaking Changes - The unsupported apm_data_stream_migration ingest pipeline has been removed {pull}12102[12102]. +- "publish_ready" is always false in the "GET /" response until events are received by apm-server {pull}TODO[TODO] [float] ==== Bug fixes @@ -23,3 +24,4 @@ https://github.com/elastic/apm-server/compare/8.11\...main[View commits] - Add support for returning partial success response in OTLP input {pull}11883[11883] - Setting event timestamp from OTel observed timestamp when needed {pull}11935[11935] - Field mappings have been added for various formerly unindexed fields {pull}12102[12102] +- We now assert that index templates are installed by attempting to create data streams {TODO}[TODO] diff --git a/internal/beater/beater.go b/internal/beater/beater.go index aee736e3fa1..330753510dd 100644 --- a/internal/beater/beater.go +++ b/internal/beater/beater.go @@ -25,6 +25,7 @@ import ( "net/http" "os" "runtime" + "sync" "time" "github.com/dustin/go-humanize" @@ -42,11 +43,9 @@ import ( _ "google.golang.org/grpc/encoding/gzip" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/instrumentation" "github.com/elastic/beats/v7/libbeat/licenser" "github.com/elastic/beats/v7/libbeat/outputs" - esoutput "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" "github.com/elastic/beats/v7/libbeat/publisher/pipeline" "github.com/elastic/beats/v7/libbeat/publisher/pipetool" agentconfig "github.com/elastic/elastic-agent-libs/config" @@ -347,7 +346,13 @@ func (s *Runner) Run(ctx context.Context) error { // any events to Elasticsearch before the integration is ready. publishReady := make(chan struct{}) drain := make(chan struct{}) + startWaitReady := make(chan struct{}) + var waitReadyOnce sync.Once g.Go(func() error { + select { + case <-ctx.Done(): + case <-startWaitReady: + } if err := s.waitReady(ctx, kibanaClient, tracer); err != nil { // One or more preconditions failed; drop events. close(drain) @@ -358,24 +363,25 @@ func (s *Runner) Run(ctx context.Context) error { close(publishReady) return nil }) - callbackUUID, err := esoutput.RegisterConnectCallback(func(*eslegclient.Connection) error { + prePublish := func(ctx context.Context) error { + waitReadyOnce.Do(func() { + close(startWaitReady) + }) select { + case <-ctx.Done(): + return ctx.Err() + case <-drain: + return errServerShuttingDown case <-publishReady: - return nil - default: } - return errors.New("not ready for publishing events") - }) - if err != nil { - return err + return nil } - defer esoutput.DeregisterConnectCallback(callbackUUID) newElasticsearchClient := func(cfg *elasticsearch.Config) (*elasticsearch.Client, error) { httpTransport, err := elasticsearch.NewHTTPTransport(cfg) if err != nil { return nil, err } - transport := &waitReadyRoundTripper{Transport: httpTransport, ready: publishReady, drain: drain} + transport := &waitReadyRoundTripper{Transport: httpTransport, onBulk: prePublish} return elasticsearch.NewClientParams(elasticsearch.ClientParams{ Config: cfg, Transport: transport, @@ -432,7 +438,7 @@ func (s *Runner) Run(ctx context.Context) error { // Create the BatchProcessor chain that is used to process all events, // including the metrics aggregated by APM Server. finalBatchProcessor, closeFinalBatchProcessor, err := s.newFinalBatchProcessor( - tracer, newElasticsearchClient, memLimitGB, + tracer, newElasticsearchClient, memLimitGB, prePublish, ) if err != nil { return err @@ -647,7 +653,9 @@ func (s *Runner) waitReady( return errors.New("cannot wait for integration without either Kibana or Elasticsearch config") } preconditions = append(preconditions, func(ctx context.Context) error { - return checkIntegrationInstalled(ctx, kibanaClient, esOutputClient, s.logger) + return checkIndexTemplatesInstalled( + ctx, kibanaClient, esOutputClient, s.config.DataStreams.Namespace, s.logger, + ) }) } @@ -672,12 +680,13 @@ func (s *Runner) newFinalBatchProcessor( tracer *apm.Tracer, newElasticsearchClient func(cfg *elasticsearch.Config) (*elasticsearch.Client, error), memLimit float64, + prePublish func(context.Context) error, ) (modelpb.BatchProcessor, func(context.Context) error, error) { monitoring.Default.Remove("libbeat") libbeatMonitoringRegistry := monitoring.Default.NewRegistry("libbeat") if s.elasticsearchOutputConfig == nil { - return s.newLibbeatFinalBatchProcessor(tracer, libbeatMonitoringRegistry) + return s.newLibbeatFinalBatchProcessor(tracer, prePublish, libbeatMonitoringRegistry) } stateRegistry := monitoring.GetNamespace("state").GetRegistry() @@ -829,6 +838,7 @@ func docappenderConfig( func (s *Runner) newLibbeatFinalBatchProcessor( tracer *apm.Tracer, + prePublish func(context.Context) error, libbeatMonitoringRegistry *monitoring.Registry, ) (modelpb.BatchProcessor, func(context.Context) error, error) { // When the publisher stops cleanly it will close its pipeline client, @@ -889,7 +899,13 @@ func (s *Runner) newLibbeatFinalBatchProcessor( } return acker.Wait(ctx) } - return publisher, stop, nil + processor := modelprocessor.Chained{ + modelpb.ProcessBatchFunc(func(ctx context.Context, batch *modelpb.Batch) error { + return prePublish(ctx) + }), + publisher, + } + return processor, stop, nil } const sourcemapIndex = ".apm-source-map" diff --git a/internal/beater/beatertest/server.go b/internal/beater/beatertest/server.go index 9812e3cb6e7..63109ac25ae 100644 --- a/internal/beater/beatertest/server.go +++ b/internal/beater/beatertest/server.go @@ -108,7 +108,8 @@ func NewUnstartedServer(t testing.TB, opts ...option) *Server { require.NoError(t, err) if !outputConfig.Output.IsSet() { err = cfg.Merge(map[string]any{ - "output.null": map[string]any{}, + "output.null": map[string]any{}, + "queue.mem.flush": map[string]any{"min_events": 1, "timeout": "1ns"}, }) require.NoError(t, err) } diff --git a/internal/beater/checkintegration.go b/internal/beater/checkintegration.go index f0fe62ebc59..a86edb4b120 100644 --- a/internal/beater/checkintegration.go +++ b/internal/beater/checkintegration.go @@ -24,23 +24,26 @@ import ( "fmt" "io" "net/http" + "strings" "github.com/pkg/errors" - "golang.org/x/sync/errgroup" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/apm-server/internal/elasticsearch" "github.com/elastic/apm-server/internal/kibana" - "github.com/elastic/go-elasticsearch/v8/esapi" + "github.com/elastic/go-elasticsearch/v8/typedapi/indices/createdatastream" + "github.com/elastic/go-elasticsearch/v8/typedapi/types" ) -// checkIntegrationInstalled checks if the APM integration is installed by querying Kibana -// and/or Elasticsearch, returning nil if and only if it is installed. -func checkIntegrationInstalled( +// checkIndexTemplatesInstalled checks if the APM index templates are installed by querying the +// APM integration status via Kibana, or by attempting to create a data stream via Elasticsearch, +// returning nil if and only if it is installed. +func checkIndexTemplatesInstalled( ctx context.Context, kibanaClient *kibana.Client, esClient *elasticsearch.Client, + namespace string, logger *logp.Logger, ) (err error) { defer func() { @@ -53,37 +56,36 @@ func checkIntegrationInstalled( } } }() + if esClient != nil { + installed, err := checkCreateDataStream(ctx, esClient, namespace) + if err != nil { + return fmt.Errorf("error checking Elasticsearch index template setup: %w", err) + } + if !installed { + return errors.New("index templates not installed") + } + return nil + } if kibanaClient != nil { - installed, err := checkIntegrationInstalledKibana(ctx, kibanaClient, logger) + installed, err := checkIntegrationInstalled(ctx, kibanaClient, logger) if err != nil { // We only return the Kibana error if we have no Elasticsearch client, // as we may not have sufficient privileges to query the Fleet API. if esClient == nil { return fmt.Errorf("error querying Kibana for integration package status: %w", err) } - } else if !installed { + } + if !installed { // We were able to query Kibana, but the package is not yet installed. - // We should continue querying the package status via Kibana, as it is - // more authoritative than checking for index template installation. return errors.New("integration package not yet installed") } - // Fall through and query Elasticsearch (if we have a client). Kibana may prematurely - // report packages as installed: https://github.com/elastic/kibana/issues/108649 - } - if esClient != nil { - installed, err := checkIntegrationInstalledElasticsearch(ctx, esClient, logger) - if err != nil { - return fmt.Errorf("error querying Elasticsearch for integration index templates: %w", err) - } else if !installed { - return errors.New("integration index templates not installed") - } } return nil } // checkIntegrationInstalledKibana checks if the APM integration package // is installed by querying Kibana. -func checkIntegrationInstalledKibana(ctx context.Context, kibanaClient *kibana.Client, logger *logp.Logger) (bool, error) { +func checkIntegrationInstalled(ctx context.Context, kibanaClient *kibana.Client, logger *logp.Logger) (bool, error) { resp, err := kibanaClient.Send(ctx, "GET", "/api/fleet/epm/packages/apm", nil, nil, nil) if err != nil { return false, err @@ -106,41 +108,22 @@ func checkIntegrationInstalledKibana(ctx context.Context, kibanaClient *kibana.C return result.Response.Status == "installed", nil } -func checkIntegrationInstalledElasticsearch(ctx context.Context, esClient *elasticsearch.Client, _ *logp.Logger) (bool, error) { - // TODO(axw) generate the list of expected index templates. - templates := []string{ - "traces-apm", - "traces-apm.sampled", - "metrics-apm.app", - "metrics-apm.internal", - "logs-apm.error", - } - for _, intervals := range []string{"1m", "10m", "60m"} { - for _, ds := range []string{"metrics-apm.transaction", "metrics-apm.service_transaction", "metrics-apm.service_destination", "metrics-apm.service_summary"} { - templates = append(templates, fmt.Sprintf("%s.%s", ds, intervals)) - } - } - // IndicesGetIndexTemplateRequest accepts a slice of template names, - // but the REST API expects just one index template name. Query them - // in parallel. - g, ctx := errgroup.WithContext(ctx) - for _, template := range templates { - template := template // copy for closure - g.Go(func() error { - req := esapi.IndicesGetIndexTemplateRequest{Name: template} - resp, err := req.Do(ctx, esClient) - if err != nil { - return err +// checkCreateDataStream attempts to create a traces-apm- data stream, +// returning an error if it could not be created. This will fail if there is no +// index template matching the pattern. +func checkCreateDataStream(ctx context.Context, esClient *elasticsearch.Client, namespace string) (bool, error) { + if _, err := createdatastream.New(esClient).Name("traces-apm-" + namespace).Do(ctx); err != nil { + var esError *types.ElasticsearchError + if errors.As(err, &esError) { + cause := esError.ErrorCause + if cause.Type == "resource_already_exists_exception" { + return true, nil } - defer resp.Body.Close() - - if resp.IsError() { - body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("unexpected HTTP status: %s (%s)", resp.Status(), bytes.TrimSpace(body)) + if cause.Reason != nil && strings.HasPrefix(*cause.Reason, "no matching index template") { + return false, nil } - return nil - }) + } + return false, err } - err := g.Wait() - return err == nil, err + return true, nil } diff --git a/internal/beater/server_test.go b/internal/beater/server_test.go index 1249a41c4af..98429cd9247 100644 --- a/internal/beater/server_test.go +++ b/internal/beater/server_test.go @@ -332,7 +332,7 @@ func TestServerOTLPGRPC(t *testing.T) { func TestServerWaitForIntegrationKibana(t *testing.T) { var requests int64 - requestCh := make(chan struct{}) + requestCh := make(chan struct{}, 3) mux := http.NewServeMux() mux.HandleFunc("/api/status", func(w http.ResponseWriter, r *http.Request) { w.Write([]byte(`{"version":{"number":"1.2.3"}}`)) @@ -363,6 +363,14 @@ func TestServerWaitForIntegrationKibana(t *testing.T) { }, }))) + // Send some events to the server. They should be accepted and enqueued. + req := makeTransactionRequest(t, srv.URL) + req.Header.Add("Content-Type", "application/x-ndjson") + resp, err := srv.Client.Do(req) + assert.NoError(t, err) + assert.Equal(t, http.StatusAccepted, resp.StatusCode) + resp.Body.Close() + timeout := time.After(10 * time.Second) for i := 0; i < 3; i++ { select { @@ -387,8 +395,8 @@ func TestServerWaitForIntegrationKibana(t *testing.T) { func TestServerWaitForIntegrationElasticsearch(t *testing.T) { var mu sync.Mutex - var tracesRequests int - tracesRequestsCh := make(chan int) + var createDataStreamRequests int + createDataStreamRequestsCh := make(chan int) bulkCh := make(chan struct{}, 1) mux := http.NewServeMux() mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { @@ -397,17 +405,24 @@ func TestServerWaitForIntegrationElasticsearch(t *testing.T) { // elasticsearch client to send bulk requests. fmt.Fprintln(w, `{"version":{"number":"1.2.3"}}`) }) - mux.HandleFunc("/_index_template/", func(w http.ResponseWriter, r *http.Request) { + mux.HandleFunc("/_data_stream/", func(w http.ResponseWriter, r *http.Request) { mu.Lock() defer mu.Unlock() - template := path.Base(r.URL.Path) - if template == "traces-apm" { - tracesRequests++ - if tracesRequests == 1 { - w.WriteHeader(404) - } - tracesRequestsCh <- tracesRequests + name := path.Base(r.URL.Path) + if name != "traces-apm-testing" { + panic("unexpected data stream name: " + name) + } + createDataStreamRequests++ + switch createDataStreamRequests { + case 1: + w.WriteHeader(500) + case 2: + w.WriteHeader(400) + w.Write([]byte(`{"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"no matching index template found for data stream [traces-apm-testing]"}],"type":"illegal_argument_exception","reason":"no matching index template found for data stream [traces-apm-testing]"},"status":400}`)) + case 3: + w.Write([]byte(`{"acknowledged":true}`)) } + createDataStreamRequestsCh <- createDataStreamRequests }) mux.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) { select { @@ -422,6 +437,7 @@ func TestServerWaitForIntegrationElasticsearch(t *testing.T) { "apm-server": map[string]interface{}{ "wait_ready_interval": "100ms", "data_streams.wait_for_integration": true, + "data_streams.namespace": "testing", }, "output.elasticsearch": map[string]interface{}{ "hosts": []string{elasticsearchServer.URL}, @@ -456,8 +472,8 @@ func TestServerWaitForIntegrationElasticsearch(t *testing.T) { var done bool for !done { select { - case n := <-tracesRequestsCh: - done = n == 2 + case n := <-createDataStreamRequestsCh: + done = n == 3 case <-timeout: t.Fatal("timed out waiting for request") } @@ -471,7 +487,7 @@ func TestServerWaitForIntegrationElasticsearch(t *testing.T) { } logs := srv.Logs.FilterMessageSnippet("please install the apm integration") - assert.Len(t, logs.All(), 1, "couldn't find remediation message logs") + assert.Len(t, logs.All(), 2, "couldn't find remediation message logs") // Healthcheck should now report that the server is publish-ready. resp, err = srv.Client.Get(srv.URL + api.RootPath) @@ -490,7 +506,7 @@ func TestServerFailedPreconditionDoesNotIndex(t *testing.T) { // elasticsearch client to send bulk requests. fmt.Fprintln(w, `{"version":{"number":"1.2.3"}}`) }) - mux.HandleFunc("/_index_template/", func(w http.ResponseWriter, r *http.Request) { + mux.HandleFunc("/_data_stream/traces-apm-default", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(404) }) mux.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) { diff --git a/internal/beater/waitready.go b/internal/beater/waitready.go index 7bfaa6b04fd..50811a4d688 100644 --- a/internal/beater/waitready.go +++ b/internal/beater/waitready.go @@ -21,6 +21,7 @@ import ( "context" "errors" "net/http" + "path" "time" "go.elastic.co/apm/v2" @@ -71,27 +72,21 @@ func waitReady( } // waitReadyRoundTripper wraps a *net/http.Transport, ensuring the server's -// indexing preconditions have been satisfied by waiting for "ready" channel -// to be signalled, prior to allowing any requests through. -// -// This is used to prevent elasticsearch clients from proceeding with requests -// until the APM integration is installed to ensure we don't index any documents -// prior to the data stream index templates being ready. +// indexing preconditions have been satisfied prior to allowing any indexing +// requests through. This is used to ensure we don't index any documents prior +// to the data stream index templates being ready. type waitReadyRoundTripper struct { *http.Transport - ready <-chan struct{} - drain <-chan struct{} + onBulk func(context.Context) error } var errServerShuttingDown = errors.New("server shutting down") func (c *waitReadyRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { - select { - case <-c.drain: - return nil, errServerShuttingDown - case <-c.ready: - case <-r.Context().Done(): - return nil, r.Context().Err() + if path.Base(r.URL.Path) == "_bulk" { + if err := c.onBulk(r.Context()); err != nil { + return nil, err + } } return c.Transport.RoundTrip(r) }