Skip to content

Commit

Permalink
remove-async-handling (#12386)
Browse files Browse the repository at this point in the history
* update apm-data to latest main

* remove test case

* update notice

* update apm-data to latest version

* remove async

* update apm-data + tidy

* additional fixes

* build: do not bump unrelated libraries

* build: update notice file

* test: remove async intake system tests

---------

Co-authored-by: Silvia Mitter <[email protected]>
Co-authored-by: kruskall <[email protected]>
  • Loading branch information
3 people authored Jan 15, 2024
1 parent 655ca80 commit 52849ff
Show file tree
Hide file tree
Showing 7 changed files with 7 additions and 138 deletions.
4 changes: 2 additions & 2 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,11 @@ these terms.

--------------------------------------------------------------------------------
Dependency : github.com/elastic/apm-data
Version: v0.1.1-0.20231212041654-b2a4dabeb6e3
Version: v0.1.1-0.20240111111310-80b6af8d97e1
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/[email protected].20231212041654-b2a4dabeb6e3/LICENSE:
Contents of probable licence file $GOMODCACHE/github.com/elastic/[email protected].20240111111310-80b6af8d97e1/LICENSE:

Apache License
Version 2.0, January 2004
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/dgraph-io/badger/v2 v2.2007.3-0.20201012072640-f5a7e0a1c83b
github.com/dustin/go-humanize v1.0.1
github.com/elastic/apm-aggregation v0.0.0-20230815024520-e75a37d9ddd6
github.com/elastic/apm-data v0.1.1-0.20231212041654-b2a4dabeb6e3
github.com/elastic/apm-data v0.1.1-0.20240111111310-80b6af8d97e1
github.com/elastic/beats/v7 v7.0.0-alpha2.0.20240108094715-95f0f85a3edd
github.com/elastic/elastic-agent-client/v7 v7.6.0
github.com/elastic/elastic-agent-libs v0.7.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP
github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM=
github.com/elastic/apm-aggregation v0.0.0-20230815024520-e75a37d9ddd6 h1:Js+C3HEE0a5BDFmhEmJV/Uo4uzj/paHjd7yl6+KYguw=
github.com/elastic/apm-aggregation v0.0.0-20230815024520-e75a37d9ddd6/go.mod h1:ba3gaJCuhxXN/O5AuiI56xxd6DukQdVOK0NfpzBntNo=
github.com/elastic/apm-data v0.1.1-0.20231212041654-b2a4dabeb6e3 h1:pavWRIAjsPregjeLBOFlm6UBDbsVYXGuQh+ancXJNiw=
github.com/elastic/apm-data v0.1.1-0.20231212041654-b2a4dabeb6e3/go.mod h1:z4iJVl8vyQa5v5o7UapWGHTsycBKsKfJfILuf2TZpYo=
github.com/elastic/apm-data v0.1.1-0.20240111111310-80b6af8d97e1 h1:Wvt3GnXk1KzWeC8E/PRpQgbHAQDQmFvc7wJlnW9viCk=
github.com/elastic/apm-data v0.1.1-0.20240111111310-80b6af8d97e1/go.mod h1:z4iJVl8vyQa5v5o7UapWGHTsycBKsKfJfILuf2TZpYo=
github.com/elastic/beats/v7 v7.0.0-alpha2.0.20240108094715-95f0f85a3edd h1:rC9i9rR72M3X3w11rINvTnJ12WsRu4wHlMBswfZjdAs=
github.com/elastic/beats/v7 v7.0.0-alpha2.0.20240108094715-95f0f85a3edd/go.mod h1:rNNO1YXUHEa0qVE9csGluxw/OohHciGvMzAUAjK1uS4=
github.com/elastic/elastic-agent-autodiscover v0.6.6 h1:P1y0dDpbhJc7Uw/xe85irPEad4Vljygc+y4iSxtqW7A=
Expand Down
37 changes: 1 addition & 36 deletions internal/beater/api/intake/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@ import (
"errors"
"fmt"
"net/http"
"strconv"
"strings"

"go.elastic.co/apm/v2"

"github.com/elastic/elastic-agent-libs/monitoring"

"github.com/elastic/apm-data/input/elasticapm"
Expand Down Expand Up @@ -69,27 +66,6 @@ func Handler(handler elasticapm.StreamHandler, requestMetadataFunc RequestMetada
return
}

// Async can be set by clients to request non-blocking event processing,
// returning immediately with an error `elasticapm.ErrQueueFull` when it
// can't be serviced.
//
// Async processing has weaker guarantees for the client since any
// errors while processing the batch cannot be communicated back to the
// client.
//
// Instead, errors are logged by the APM Server.
async := asyncRequest(c.Request)

// Create a new detached context when asynchronous processing is set,
// decoupling the context from its deadline, which will finish when
// the request is handled. The batch will probably be processed after
// the request has finished, and it would cause an error if the context
// is done.
ctx := c.Request.Context()
if async {
ctx = apm.DetachedContext(ctx)
}

// If there was an error decoding the body, then it Result.Err
// will already be set. Reformat the error response.
if c.Result.Err != nil {
Expand All @@ -99,8 +75,7 @@ func Handler(handler elasticapm.StreamHandler, requestMetadataFunc RequestMetada

var result elasticapm.Result
err := handler.HandleStream(
ctx,
async,
c.Request.Context(),
requestMetadataFunc(c),
c.Request.Body,
batchSize,
Expand Down Expand Up @@ -194,8 +169,6 @@ func processStreamError(err error) (request.ResultID, jsonError) {
err = errServerShuttingDown
case errors.Is(err, publish.ErrFull):
errID = request.IDResponseErrorsFullQueue
case errors.Is(err, elasticapm.ErrQueueFull):
errID = request.IDResponseErrorsFullQueue
case errors.Is(err, errMethodNotAllowed):
errID = request.IDResponseErrorsMethodNotAllowed
case errors.Is(err, errInvalidContentType):
Expand Down Expand Up @@ -250,11 +223,3 @@ type jsonError struct {
Message string `json:"message"`
Document string `json:"document,omitempty"`
}

func asyncRequest(req *http.Request) bool {
var async bool
if asyncStr := req.URL.Query().Get("async"); asyncStr != "" {
async, _ = strconv.ParseBool(asyncStr)
}
return async
}
12 changes: 1 addition & 11 deletions internal/beater/api/intake/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,6 @@ func TestIntakeHandler(t *testing.T) {
return publish.ErrChannelClosed
}),
code: http.StatusServiceUnavailable, id: request.IDResponseErrorsShuttingDown},
"FullQueue": {
path: "errors.ndjson",
batchProcessor: modelpb.ProcessBatchFunc(func(context.Context, *modelpb.Batch) error {
return elasticapm.ErrQueueFull
}),
code: http.StatusServiceUnavailable, id: request.IDResponseErrorsFullQueue,
},
"FullQueueLegacy": {
path: "errors.ndjson",
batchProcessor: modelpb.ProcessBatchFunc(func(context.Context, *modelpb.Batch) error {
Expand Down Expand Up @@ -176,7 +169,6 @@ func TestIntakeHandlerMonitoring(t *testing.T) {

streamHandler := streamHandlerFunc(func(
ctx context.Context,
async bool,
base *modelpb.APMEvent,
stream io.Reader,
batchSize int,
Expand Down Expand Up @@ -295,7 +287,6 @@ func emptyRequestMetadata(*request.Context) *modelpb.APMEvent {

type streamHandlerFunc func(
ctx context.Context,
async bool,
base *modelpb.APMEvent,
stream io.Reader,
batchSize int,
Expand All @@ -305,12 +296,11 @@ type streamHandlerFunc func(

func (f streamHandlerFunc) HandleStream(
ctx context.Context,
async bool,
base *modelpb.APMEvent,
stream io.Reader,
batchSize int,
processor modelpb.BatchProcessor,
out *elasticapm.Result,
) error {
return f(ctx, async, base, stream, batchSize, processor, out)
return f(ctx, base, stream, batchSize, processor, out)
}
5 changes: 0 additions & 5 deletions systemtest/intake.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,6 @@ func SendBackendEventsPayload(t *testing.T, serverURL string, payloadFile string
return sendEventsPayload(t, serverURL, "/intake/v2/events", f)
}

func SendBackendEventsAsyncPayload(t *testing.T, serverURL string, payloadFile string) {
f := openFile(t, payloadFile)
sendEventsPayload(t, serverURL, "/intake/v2/events?async=true", f)
}

func SendBackendEventsAsyncPayloadError(t *testing.T, serverURL string, payloadFile string) {
f := openFile(t, payloadFile)

Expand Down
81 changes: 0 additions & 81 deletions systemtest/intake_async_test.go

This file was deleted.

0 comments on commit 52849ff

Please sign in to comment.