From 52849ff30f6211afacdbecc838a645eac93cb6da Mon Sep 17 00:00:00 2001 From: SylvainJuge <763082+SylvainJuge@users.noreply.github.com> Date: Mon, 15 Jan 2024 15:25:45 +0100 Subject: [PATCH] remove-async-handling (#12386) * update apm-data to latest main * remove test case * update notice * update apm-data to latest version * remove async * update apm-data + tidy * additional fixes * build: do not bump unrelated libraries * build: update notice file * test: remove async intake system tests --------- Co-authored-by: Silvia Mitter Co-authored-by: kruskall <99559985+kruskall@users.noreply.github.com> --- NOTICE.txt | 4 +- go.mod | 2 +- go.sum | 4 +- internal/beater/api/intake/handler.go | 37 +--------- internal/beater/api/intake/handler_test.go | 12 +--- systemtest/intake.go | 5 -- systemtest/intake_async_test.go | 81 ---------------------- 7 files changed, 7 insertions(+), 138 deletions(-) delete mode 100644 systemtest/intake_async_test.go diff --git a/NOTICE.txt b/NOTICE.txt index 4a38d9aa349..c6c2df9fcac 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -362,11 +362,11 @@ these terms. -------------------------------------------------------------------------------- Dependency : github.com/elastic/apm-data -Version: v0.1.1-0.20231212041654-b2a4dabeb6e3 +Version: v0.1.1-0.20240111111310-80b6af8d97e1 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/apm-data@v0.1.1-0.20231212041654-b2a4dabeb6e3/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/elastic/apm-data@v0.1.1-0.20240111111310-80b6af8d97e1/LICENSE: Apache License Version 2.0, January 2004 diff --git a/go.mod b/go.mod index 997bad275d4..40464723847 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/dgraph-io/badger/v2 v2.2007.3-0.20201012072640-f5a7e0a1c83b github.com/dustin/go-humanize v1.0.1 github.com/elastic/apm-aggregation v0.0.0-20230815024520-e75a37d9ddd6 - github.com/elastic/apm-data v0.1.1-0.20231212041654-b2a4dabeb6e3 + github.com/elastic/apm-data v0.1.1-0.20240111111310-80b6af8d97e1 github.com/elastic/beats/v7 v7.0.0-alpha2.0.20240108094715-95f0f85a3edd github.com/elastic/elastic-agent-client/v7 v7.6.0 github.com/elastic/elastic-agent-libs v0.7.3 diff --git a/go.sum b/go.sum index 087cc8ceb5b..0f328e87185 100644 --- a/go.sum +++ b/go.sum @@ -132,8 +132,8 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM= github.com/elastic/apm-aggregation v0.0.0-20230815024520-e75a37d9ddd6 h1:Js+C3HEE0a5BDFmhEmJV/Uo4uzj/paHjd7yl6+KYguw= github.com/elastic/apm-aggregation v0.0.0-20230815024520-e75a37d9ddd6/go.mod h1:ba3gaJCuhxXN/O5AuiI56xxd6DukQdVOK0NfpzBntNo= -github.com/elastic/apm-data v0.1.1-0.20231212041654-b2a4dabeb6e3 h1:pavWRIAjsPregjeLBOFlm6UBDbsVYXGuQh+ancXJNiw= -github.com/elastic/apm-data v0.1.1-0.20231212041654-b2a4dabeb6e3/go.mod h1:z4iJVl8vyQa5v5o7UapWGHTsycBKsKfJfILuf2TZpYo= +github.com/elastic/apm-data v0.1.1-0.20240111111310-80b6af8d97e1 h1:Wvt3GnXk1KzWeC8E/PRpQgbHAQDQmFvc7wJlnW9viCk= +github.com/elastic/apm-data v0.1.1-0.20240111111310-80b6af8d97e1/go.mod h1:z4iJVl8vyQa5v5o7UapWGHTsycBKsKfJfILuf2TZpYo= github.com/elastic/beats/v7 v7.0.0-alpha2.0.20240108094715-95f0f85a3edd h1:rC9i9rR72M3X3w11rINvTnJ12WsRu4wHlMBswfZjdAs= github.com/elastic/beats/v7 v7.0.0-alpha2.0.20240108094715-95f0f85a3edd/go.mod h1:rNNO1YXUHEa0qVE9csGluxw/OohHciGvMzAUAjK1uS4= github.com/elastic/elastic-agent-autodiscover v0.6.6 h1:P1y0dDpbhJc7Uw/xe85irPEad4Vljygc+y4iSxtqW7A= diff --git a/internal/beater/api/intake/handler.go b/internal/beater/api/intake/handler.go index 9e1cf391fad..4c64d5af54b 100644 --- a/internal/beater/api/intake/handler.go +++ b/internal/beater/api/intake/handler.go @@ -21,11 +21,8 @@ import ( "errors" "fmt" "net/http" - "strconv" "strings" - "go.elastic.co/apm/v2" - "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/apm-data/input/elasticapm" @@ -69,27 +66,6 @@ func Handler(handler elasticapm.StreamHandler, requestMetadataFunc RequestMetada return } - // Async can be set by clients to request non-blocking event processing, - // returning immediately with an error `elasticapm.ErrQueueFull` when it - // can't be serviced. - // - // Async processing has weaker guarantees for the client since any - // errors while processing the batch cannot be communicated back to the - // client. - // - // Instead, errors are logged by the APM Server. - async := asyncRequest(c.Request) - - // Create a new detached context when asynchronous processing is set, - // decoupling the context from its deadline, which will finish when - // the request is handled. The batch will probably be processed after - // the request has finished, and it would cause an error if the context - // is done. - ctx := c.Request.Context() - if async { - ctx = apm.DetachedContext(ctx) - } - // If there was an error decoding the body, then it Result.Err // will already be set. Reformat the error response. if c.Result.Err != nil { @@ -99,8 +75,7 @@ func Handler(handler elasticapm.StreamHandler, requestMetadataFunc RequestMetada var result elasticapm.Result err := handler.HandleStream( - ctx, - async, + c.Request.Context(), requestMetadataFunc(c), c.Request.Body, batchSize, @@ -194,8 +169,6 @@ func processStreamError(err error) (request.ResultID, jsonError) { err = errServerShuttingDown case errors.Is(err, publish.ErrFull): errID = request.IDResponseErrorsFullQueue - case errors.Is(err, elasticapm.ErrQueueFull): - errID = request.IDResponseErrorsFullQueue case errors.Is(err, errMethodNotAllowed): errID = request.IDResponseErrorsMethodNotAllowed case errors.Is(err, errInvalidContentType): @@ -250,11 +223,3 @@ type jsonError struct { Message string `json:"message"` Document string `json:"document,omitempty"` } - -func asyncRequest(req *http.Request) bool { - var async bool - if asyncStr := req.URL.Query().Get("async"); asyncStr != "" { - async, _ = strconv.ParseBool(asyncStr) - } - return async -} diff --git a/internal/beater/api/intake/handler_test.go b/internal/beater/api/intake/handler_test.go index 8cfa0c5a79d..4c52df49e83 100644 --- a/internal/beater/api/intake/handler_test.go +++ b/internal/beater/api/intake/handler_test.go @@ -106,13 +106,6 @@ func TestIntakeHandler(t *testing.T) { return publish.ErrChannelClosed }), code: http.StatusServiceUnavailable, id: request.IDResponseErrorsShuttingDown}, - "FullQueue": { - path: "errors.ndjson", - batchProcessor: modelpb.ProcessBatchFunc(func(context.Context, *modelpb.Batch) error { - return elasticapm.ErrQueueFull - }), - code: http.StatusServiceUnavailable, id: request.IDResponseErrorsFullQueue, - }, "FullQueueLegacy": { path: "errors.ndjson", batchProcessor: modelpb.ProcessBatchFunc(func(context.Context, *modelpb.Batch) error { @@ -176,7 +169,6 @@ func TestIntakeHandlerMonitoring(t *testing.T) { streamHandler := streamHandlerFunc(func( ctx context.Context, - async bool, base *modelpb.APMEvent, stream io.Reader, batchSize int, @@ -295,7 +287,6 @@ func emptyRequestMetadata(*request.Context) *modelpb.APMEvent { type streamHandlerFunc func( ctx context.Context, - async bool, base *modelpb.APMEvent, stream io.Reader, batchSize int, @@ -305,12 +296,11 @@ type streamHandlerFunc func( func (f streamHandlerFunc) HandleStream( ctx context.Context, - async bool, base *modelpb.APMEvent, stream io.Reader, batchSize int, processor modelpb.BatchProcessor, out *elasticapm.Result, ) error { - return f(ctx, async, base, stream, batchSize, processor, out) + return f(ctx, base, stream, batchSize, processor, out) } diff --git a/systemtest/intake.go b/systemtest/intake.go index c89dbe6df50..2ed1b48f3dc 100644 --- a/systemtest/intake.go +++ b/systemtest/intake.go @@ -47,11 +47,6 @@ func SendBackendEventsPayload(t *testing.T, serverURL string, payloadFile string return sendEventsPayload(t, serverURL, "/intake/v2/events", f) } -func SendBackendEventsAsyncPayload(t *testing.T, serverURL string, payloadFile string) { - f := openFile(t, payloadFile) - sendEventsPayload(t, serverURL, "/intake/v2/events?async=true", f) -} - func SendBackendEventsAsyncPayloadError(t *testing.T, serverURL string, payloadFile string) { f := openFile(t, payloadFile) diff --git a/systemtest/intake_async_test.go b/systemtest/intake_async_test.go deleted file mode 100644 index 9a8b56f9dde..00000000000 --- a/systemtest/intake_async_test.go +++ /dev/null @@ -1,81 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package systemtest_test - -import ( - "sync" - "testing" - - "github.com/elastic/apm-server/systemtest" - "github.com/elastic/apm-server/systemtest/apmservertest" - "github.com/elastic/apm-server/systemtest/estest" -) - -func TestIntakeAsync(t *testing.T) { - t.Run("semaphore_size_5", func(t *testing.T) { - systemtest.CleanupElasticsearch(t) - // limit the maximum concurrent decoders to 5. This will allow to test - // for the successful case (no concurrency and unsuccessful case). - srv := apmservertest.NewServerTB(t, "-E", "apm-server.max_concurrent_decoders=5") - - systemtest.SendBackendEventsAsyncPayload(t, srv.URL, `../testdata/intake-v2/errors.ndjson`) - // Ensure the 5 errors are ingested. - estest.ExpectMinDocs(t, systemtest.Elasticsearch, 5, "logs-apm.error-*", nil) - - // Send a request with a lot of events (~1920) and expect errors to be - // returned, since the semaphore size is 5 (5 concurrent batches). - systemtest.SendBackendEventsAsyncPayloadError(t, srv.URL, `../testdata/intake-v2/heavy.ndjson`) - - // Create 4 requests to be run concurrently and ensure that they return with - // a 503 - var wg sync.WaitGroup - for i := 0; i < 4; i++ { - wg.Add(1) - go func() { - defer wg.Done() - systemtest.SendBackendEventsAsyncPayloadError(t, srv.URL, `../testdata/intake-v2/heavy.ndjson`) - }() - } - wg.Wait() - }) - t.Run("semaphore_size_default", func(t *testing.T) { - systemtest.CleanupElasticsearch(t) - srv := apmservertest.NewServerTB(t) - - systemtest.SendBackendEventsAsyncPayload(t, srv.URL, `../testdata/intake-v2/errors.ndjson`) - // Ensure the 5 errors are ingested. - estest.ExpectMinDocs(t, systemtest.Elasticsearch, 5, "logs-apm.error-*", nil) - - // Send a request with a lot of events (~1920) and expect it to be processed - // without any errors. - systemtest.SendBackendEventsPayload(t, srv.URL, `../testdata/intake-v2/heavy.ndjson`) - - // Create 10 requests to be run concurrently and ensure that they succeed. - var wg sync.WaitGroup - for i := 0; i < 10; i++ { - wg.Add(1) - go func() { - defer wg.Done() - // ratelimit.ndjson contains 20 events, which means that with - // the default batch size of 10 it'll acquire the lock twice. - systemtest.SendBackendEventsAsyncPayload(t, srv.URL, `../testdata/intake-v2/ratelimit.ndjson`) - }() - } - wg.Wait() - }) -}