diff --git a/apm-lambda-extension/extension/apm_server.go b/apm-lambda-extension/extension/apm_server_transport.go similarity index 50% rename from apm-lambda-extension/extension/apm_server.go rename to apm-lambda-extension/extension/apm_server_transport.go index 5c61806c..9b519098 100644 --- a/apm-lambda-extension/extension/apm_server.go +++ b/apm-lambda-extension/extension/apm_server_transport.go @@ -32,14 +32,10 @@ import ( "time" ) -var bufferPool = sync.Pool{New: func() interface{} { - return &bytes.Buffer{} -}} - -type ApmServerTransportStatusType string - // Constants for the state of the transport used in // the backoff implementation. +type ApmServerTransportStatusType string + const ( Failing ApmServerTransportStatusType = "Failing" Pending ApmServerTransportStatusType = "Pending" @@ -48,19 +44,73 @@ const ( // A struct to track the state and status of sending // to the APM server. Used in the backoff implementation. -type ApmServerTransportStateType struct { +type ApmServerTransport struct { sync.Mutex - Status ApmServerTransportStatusType - ReconnectionCount int - GracePeriodTimer *time.Timer + bufferPool sync.Pool + config *extensionConfig + AgentDoneSignal chan struct{} + dataChannel chan AgentData + client *http.Client + status ApmServerTransportStatusType + reconnectionCount int + gracePeriodTimer *time.Timer } -// The status of transport to the APM server. -// -// This instance of the ApmServerTransportStateType is public for use in tests. -var ApmServerTransportState = ApmServerTransportStateType{ - Status: Healthy, - ReconnectionCount: -1, +func InitApmServerTransport(config *extensionConfig) *ApmServerTransport { + var transport ApmServerTransport + transport.bufferPool = sync.Pool{New: func() interface{} { + return &bytes.Buffer{} + }} + transport.dataChannel = make(chan AgentData, 100) + transport.client = &http.Client{ + Timeout: time.Duration(config.DataForwarderTimeoutSeconds) * time.Second, + Transport: http.DefaultTransport.(*http.Transport).Clone(), + } + transport.config = config + transport.status = Healthy + transport.reconnectionCount = -1 + return &transport +} + +// StartBackgroundApmDataForwarding Receive agent data as it comes in and post it to the APM server. +// Stop checking for, and sending agent data when the function invocation +// has completed, signaled via a channel. +func (transport *ApmServerTransport) ForwardApmData(ctx context.Context) error { + if transport.status == Failing { + return nil + } + for { + select { + case <-ctx.Done(): + Log.Debug("Invocation context cancelled, not processing any more agent data") + return nil + case agentData := <-transport.dataChannel: + if err := transport.PostToApmServer(ctx, agentData); err != nil { + return fmt.Errorf("error sending to APM server, skipping: %v", err) + } + } + } +} + +// FlushAPMData reads all the apm data in the apm data channel and sends it to the APM server. +func (transport *ApmServerTransport) FlushAPMData(ctx context.Context) { + if transport.status == Failing { + Log.Debug("Flush skipped - Transport failing") + return + } + Log.Debug("Flush started - Checking for agent data") + for { + select { + case agentData := <-transport.dataChannel: + Log.Debug("Flush in progress - Processing agent data") + if err := transport.PostToApmServer(ctx, agentData); err != nil { + Log.Errorf("Error sending to APM server, skipping: %v", err) + } + default: + Log.Debug("Flush ended - No agent data on buffer") + return + } + } } // PostToApmServer takes a chunk of APM agent data and posts it to the APM server. @@ -68,10 +118,10 @@ var ApmServerTransportState = ApmServerTransportStateType{ // The function compresses the APM agent data, if it's not already compressed. // It sets the APM transport status to failing upon errors, as part of the backoff // strategy. -func PostToApmServer(client *http.Client, agentData AgentData, config *extensionConfig, ctx context.Context) error { +func (transport *ApmServerTransport) PostToApmServer(ctx context.Context, agentData AgentData) error { // todo: can this be a streaming or streaming style call that keeps the // connection open across invocations? - if !IsTransportStatusHealthyOrPending() { + if transport.status == Failing { return errors.New("transport status is unhealthy") } @@ -83,10 +133,10 @@ func PostToApmServer(client *http.Client, agentData AgentData, config *extension r = bytes.NewReader(agentData.Data) } else { encoding = "gzip" - buf := bufferPool.Get().(*bytes.Buffer) + buf := transport.bufferPool.Get().(*bytes.Buffer) defer func() { buf.Reset() - bufferPool.Put(buf) + transport.bufferPool.Put(buf) }() gw, err := gzip.NewWriterLevel(buf, gzip.BestSpeed) if err != nil { @@ -101,22 +151,22 @@ func PostToApmServer(client *http.Client, agentData AgentData, config *extension r = buf } - req, err := http.NewRequest("POST", config.apmServerUrl+endpointURI, r) + req, err := http.NewRequest("POST", transport.config.apmServerUrl+endpointURI, r) if err != nil { return fmt.Errorf("failed to create a new request when posting to APM server: %v", err) } req.Header.Add("Content-Encoding", encoding) req.Header.Add("Content-Type", "application/x-ndjson") - if config.apmServerApiKey != "" { - req.Header.Add("Authorization", "ApiKey "+config.apmServerApiKey) - } else if config.apmServerSecretToken != "" { - req.Header.Add("Authorization", "Bearer "+config.apmServerSecretToken) + if transport.config.apmServerApiKey != "" { + req.Header.Add("Authorization", "ApiKey "+transport.config.apmServerApiKey) + } else if transport.config.apmServerSecretToken != "" { + req.Header.Add("Authorization", "Bearer "+transport.config.apmServerSecretToken) } - Log.Debug("Sending data chunk to APM Server") - resp, err := client.Do(req) + Log.Debug("Sending data chunk to APM server") + resp, err := transport.client.Do(req) if err != nil { - SetApmServerTransportState(Failing, ctx) + transport.SetApmServerTransportState(ctx, Failing) return fmt.Errorf("failed to post to APM server: %v", err) } @@ -124,25 +174,17 @@ func PostToApmServer(client *http.Client, agentData AgentData, config *extension defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { - SetApmServerTransportState(Failing, ctx) + transport.SetApmServerTransportState(ctx, Failing) return fmt.Errorf("failed to read the response body after posting to the APM server") } - SetApmServerTransportState(Healthy, ctx) + transport.SetApmServerTransportState(ctx, Healthy) Log.Debug("Transport status set to healthy") Log.Debugf("APM server response body: %v", string(body)) Log.Debugf("APM server response status code: %v", resp.StatusCode) return nil } -// IsTransportStatusHealthyOrPending returns true if the APM server transport status is -// healthy or pending, and false otherwise. -// -// This function is public for use in tests. -func IsTransportStatusHealthyOrPending() bool { - return ApmServerTransportState.Status != Failing -} - // SetApmServerTransportState takes a state of the APM server transport and updates // the current state of the transport. For a change to a failing state, the grace period // is calculated and a go routine is started that waits for that period to complete @@ -150,49 +192,49 @@ func IsTransportStatusHealthyOrPending() bool { // to the APM server. // // This function is public for use in tests. -func SetApmServerTransportState(status ApmServerTransportStatusType, ctx context.Context) { +func (transport *ApmServerTransport) SetApmServerTransportState(ctx context.Context, status ApmServerTransportStatusType) { switch status { case Healthy: - ApmServerTransportState.Lock() - ApmServerTransportState.Status = status - Log.Debugf("APM Server Transport status set to %s", status) - ApmServerTransportState.ReconnectionCount = -1 - ApmServerTransportState.Unlock() + transport.Lock() + transport.status = status + Log.Debugf("APM server Transport status set to %s", transport.status) + transport.reconnectionCount = -1 + transport.Unlock() case Failing: - ApmServerTransportState.Lock() - ApmServerTransportState.Status = status - Log.Debugf("APM Server Transport status set to %s", status) - ApmServerTransportState.ReconnectionCount++ - ApmServerTransportState.GracePeriodTimer = time.NewTimer(computeGracePeriod()) - Log.Debugf("Grace period entered, reconnection count : %d", ApmServerTransportState.ReconnectionCount) + transport.Lock() + transport.status = status + Log.Debugf("APM server Transport status set to %s", transport.status) + transport.reconnectionCount++ + transport.gracePeriodTimer = time.NewTimer(transport.computeGracePeriod()) + Log.Debugf("Grace period entered, reconnection count : %d", transport.reconnectionCount) go func() { select { - case <-ApmServerTransportState.GracePeriodTimer.C: + case <-transport.gracePeriodTimer.C: Log.Debug("Grace period over - timer timed out") case <-ctx.Done(): Log.Debug("Grace period over - context done") } - ApmServerTransportState.Status = Pending - Log.Debugf("APM Server Transport status set to %s", status) - ApmServerTransportState.Unlock() + transport.status = Pending + Log.Debugf("APM server Transport status set to %s", transport.status) + transport.Unlock() }() default: - Log.Errorf("Cannot set APM Server Transport status to %s", status) + Log.Errorf("Cannot set APM server Transport status to %s", status) } } // ComputeGracePeriod https://github.com/elastic/apm/blob/main/specs/agents/transport.md#transport-errors -func computeGracePeriod() time.Duration { - gracePeriodWithoutJitter := math.Pow(math.Min(float64(ApmServerTransportState.ReconnectionCount), 6), 2) +func (transport *ApmServerTransport) computeGracePeriod() time.Duration { + gracePeriodWithoutJitter := math.Pow(math.Min(float64(transport.reconnectionCount), 6), 2) jitter := rand.Float64()/5 - 0.1 return time.Duration((gracePeriodWithoutJitter + jitter*gracePeriodWithoutJitter) * float64(time.Second)) } // EnqueueAPMData adds a AgentData struct to the agent data channel, effectively queueing for a send // to the APM server. -func EnqueueAPMData(agentDataChannel chan AgentData, agentData AgentData) { +func (transport *ApmServerTransport) EnqueueAPMData(agentData AgentData) { select { - case agentDataChannel <- agentData: + case transport.dataChannel <- agentData: Log.Debug("Adding agent data to buffer to be sent to apm server") default: Log.Warn("Channel full: dropping a subset of agent data") diff --git a/apm-lambda-extension/extension/apm_server_test.go b/apm-lambda-extension/extension/apm_server_transport_test.go similarity index 78% rename from apm-lambda-extension/extension/apm_server_test.go rename to apm-lambda-extension/extension/apm_server_transport_test.go index df7b2ac5..5362e588 100644 --- a/apm-lambda-extension/extension/apm_server_test.go +++ b/apm-lambda-extension/extension/apm_server_transport_test.go @@ -29,6 +29,7 @@ import ( ) func TestPostToApmServerDataCompressed(t *testing.T) { + s := "A long time ago in a galaxy far, far away..." // Compress the data @@ -68,8 +69,8 @@ func TestPostToApmServerDataCompressed(t *testing.T) { config := extensionConfig{ apmServerUrl: apmServer.URL + "/", } - - err := PostToApmServer(apmServer.Client(), agentData, &config, context.Background()) + transport := InitApmServerTransport(&config) + err := transport.PostToApmServer(context.Background(), agentData) assert.Equal(t, nil, err) } @@ -113,88 +114,94 @@ func TestPostToApmServerDataNotCompressed(t *testing.T) { config := extensionConfig{ apmServerUrl: apmServer.URL + "/", } - - err := PostToApmServer(apmServer.Client(), agentData, &config, context.Background()) + transport := InitApmServerTransport(&config) + err := transport.PostToApmServer(context.Background(), agentData) assert.Equal(t, nil, err) } func TestGracePeriod(t *testing.T) { - ApmServerTransportState.ReconnectionCount = 0 - val0 := computeGracePeriod().Seconds() + transport := InitApmServerTransport(&extensionConfig{}) + + transport.reconnectionCount = 0 + val0 := transport.computeGracePeriod().Seconds() assert.Equal(t, val0, float64(0)) - ApmServerTransportState.ReconnectionCount = 1 - val1 := computeGracePeriod().Seconds() + transport.reconnectionCount = 1 + val1 := transport.computeGracePeriod().Seconds() assert.InDelta(t, val1, float64(1), 0.1*1) - ApmServerTransportState.ReconnectionCount = 2 - val2 := computeGracePeriod().Seconds() + transport.reconnectionCount = 2 + val2 := transport.computeGracePeriod().Seconds() assert.InDelta(t, val2, float64(4), 0.1*4) - ApmServerTransportState.ReconnectionCount = 3 - val3 := computeGracePeriod().Seconds() + transport.reconnectionCount = 3 + val3 := transport.computeGracePeriod().Seconds() assert.InDelta(t, val3, float64(9), 0.1*9) - ApmServerTransportState.ReconnectionCount = 4 - val4 := computeGracePeriod().Seconds() + transport.reconnectionCount = 4 + val4 := transport.computeGracePeriod().Seconds() assert.InDelta(t, val4, float64(16), 0.1*16) - ApmServerTransportState.ReconnectionCount = 5 - val5 := computeGracePeriod().Seconds() + transport.reconnectionCount = 5 + val5 := transport.computeGracePeriod().Seconds() assert.InDelta(t, val5, float64(25), 0.1*25) - ApmServerTransportState.ReconnectionCount = 6 - val6 := computeGracePeriod().Seconds() + transport.reconnectionCount = 6 + val6 := transport.computeGracePeriod().Seconds() assert.InDelta(t, val6, float64(36), 0.1*36) - ApmServerTransportState.ReconnectionCount = 7 - val7 := computeGracePeriod().Seconds() + transport.reconnectionCount = 7 + val7 := transport.computeGracePeriod().Seconds() assert.InDelta(t, val7, float64(36), 0.1*36) } func TestSetHealthyTransport(t *testing.T) { - SetApmServerTransportState(Healthy, context.Background()) - assert.True(t, ApmServerTransportState.Status == Healthy) - assert.Equal(t, ApmServerTransportState.ReconnectionCount, -1) + transport := InitApmServerTransport(&extensionConfig{}) + transport.SetApmServerTransportState(context.Background(), Healthy) + assert.True(t, transport.status == Healthy) + assert.Equal(t, transport.reconnectionCount, -1) } func TestSetFailingTransport(t *testing.T) { // By explicitly setting the reconnection count to 0, we ensure that the grace period will not be 0 // and avoid a race between reaching the pending status and the test assertion. - ApmServerTransportState.ReconnectionCount = 0 - SetApmServerTransportState(Failing, context.Background()) - assert.True(t, ApmServerTransportState.Status == Failing) - assert.Equal(t, ApmServerTransportState.ReconnectionCount, 1) + transport := InitApmServerTransport(&extensionConfig{}) + transport.reconnectionCount = 0 + transport.SetApmServerTransportState(context.Background(), Failing) + assert.True(t, transport.status == Failing) + assert.Equal(t, transport.reconnectionCount, 1) } func TestSetPendingTransport(t *testing.T) { - SetApmServerTransportState(Healthy, context.Background()) - SetApmServerTransportState(Failing, context.Background()) + transport := InitApmServerTransport(&extensionConfig{}) + transport.SetApmServerTransportState(context.Background(), Healthy) + transport.SetApmServerTransportState(context.Background(), Failing) for { - if IsTransportStatusHealthyOrPending() { + if transport.status != Failing { break } } - assert.True(t, ApmServerTransportState.Status == Pending) - assert.Equal(t, ApmServerTransportState.ReconnectionCount, 0) + assert.True(t, transport.status == Pending) + assert.Equal(t, transport.reconnectionCount, 0) } func TestSetPendingTransportExplicitly(t *testing.T) { - SetApmServerTransportState(Healthy, context.Background()) - SetApmServerTransportState(Pending, context.Background()) - assert.True(t, ApmServerTransportState.Status == Healthy) - assert.Equal(t, ApmServerTransportState.ReconnectionCount, -1) + transport := InitApmServerTransport(&extensionConfig{}) + transport.SetApmServerTransportState(context.Background(), Healthy) + transport.SetApmServerTransportState(context.Background(), Pending) + assert.True(t, transport.status == Healthy) + assert.Equal(t, transport.reconnectionCount, -1) } func TestSetInvalidTransport(t *testing.T) { - SetApmServerTransportState(Healthy, context.Background()) - SetApmServerTransportState("Invalid", context.Background()) - assert.True(t, ApmServerTransportState.Status == Healthy) - assert.Equal(t, ApmServerTransportState.ReconnectionCount, -1) + transport := InitApmServerTransport(&extensionConfig{}) + transport.SetApmServerTransportState(context.Background(), Healthy) + transport.SetApmServerTransportState(context.Background(), "Invalid") + assert.True(t, transport.status == Healthy) + assert.Equal(t, transport.reconnectionCount, -1) } func TestEnterBackoffFromHealthy(t *testing.T) { - SetApmServerTransportState(Healthy, context.Background()) // Compress the data pr, pw := io.Pipe() gw, _ := gzip.NewWriterLevel(pw, gzip.BestSpeed) @@ -226,31 +233,24 @@ func TestEnterBackoffFromHealthy(t *testing.T) { return } })) - // Close the APM server early so that POST requests fail and that backoff is enabled - apmServer.Close() - config := extensionConfig{ apmServerUrl: apmServer.URL + "/", } + transport := InitApmServerTransport(&config) + transport.SetApmServerTransportState(context.Background(), Healthy) - if err := PostToApmServer(apmServer.Client(), agentData, &config, context.Background()); err != nil { + // Close the APM server early so that POST requests fail and that backoff is enabled + apmServer.Close() + + if err := transport.PostToApmServer(context.Background(), agentData); err != nil { return } // No way to know for sure if failing or pending (0 sec grace period) - assert.True(t, ApmServerTransportState.Status != Healthy) - assert.Equal(t, ApmServerTransportState.ReconnectionCount, 0) + assert.True(t, transport.status != Healthy) + assert.Equal(t, transport.reconnectionCount, 0) } func TestEnterBackoffFromFailing(t *testing.T) { - SetApmServerTransportState(Healthy, context.Background()) - SetApmServerTransportState(Failing, context.Background()) - for { - if IsTransportStatusHealthyOrPending() { - break - } - } - assert.Equal(t, ApmServerTransportState.Status, Pending) - // Compress the data pr, pw := io.Pipe() gw, _ := gzip.NewWriterLevel(pw, gzip.BestSpeed) @@ -290,21 +290,22 @@ func TestEnterBackoffFromFailing(t *testing.T) { apmServerUrl: apmServer.URL + "/", } - assert.Error(t, PostToApmServer(apmServer.Client(), agentData, &config, context.Background())) - assert.Equal(t, ApmServerTransportState.Status, Failing) - assert.Equal(t, ApmServerTransportState.ReconnectionCount, 1) -} - -func TestAPMServerRecovery(t *testing.T) { - SetApmServerTransportState(Healthy, context.Background()) - SetApmServerTransportState(Failing, context.Background()) + transport := InitApmServerTransport(&config) + transport.SetApmServerTransportState(context.Background(), Healthy) + transport.SetApmServerTransportState(context.Background(), Failing) for { - if IsTransportStatusHealthyOrPending() { + if transport.status != Failing { break } } - assert.Equal(t, ApmServerTransportState.Status, Pending) + assert.Equal(t, transport.status, Pending) + assert.Error(t, transport.PostToApmServer(context.Background(), agentData)) + assert.Equal(t, transport.status, Failing) + assert.Equal(t, transport.reconnectionCount, 1) +} + +func TestAPMServerRecovery(t *testing.T) { // Compress the data pr, pw := io.Pipe() gw, _ := gzip.NewWriterLevel(pw, gzip.BestSpeed) @@ -342,21 +343,22 @@ func TestAPMServerRecovery(t *testing.T) { apmServerUrl: apmServer.URL + "/", } - assert.NoError(t, PostToApmServer(apmServer.Client(), agentData, &config, context.Background())) - assert.Equal(t, ApmServerTransportState.Status, Healthy) - assert.Equal(t, ApmServerTransportState.ReconnectionCount, -1) -} - -func TestContinuedAPMServerFailure(t *testing.T) { - SetApmServerTransportState(Healthy, context.Background()) - SetApmServerTransportState(Failing, context.Background()) + transport := InitApmServerTransport(&config) + transport.SetApmServerTransportState(context.Background(), Healthy) + transport.SetApmServerTransportState(context.Background(), Failing) for { - if IsTransportStatusHealthyOrPending() { + if transport.status != Failing { break } } - assert.Equal(t, ApmServerTransportState.Status, Pending) + assert.Equal(t, transport.status, Pending) + assert.NoError(t, transport.PostToApmServer(context.Background(), agentData)) + assert.Equal(t, transport.status, Healthy) + assert.Equal(t, transport.reconnectionCount, -1) +} + +func TestContinuedAPMServerFailure(t *testing.T) { // Compress the data pr, pw := io.Pipe() gw, _ := gzip.NewWriterLevel(pw, gzip.BestSpeed) @@ -385,31 +387,30 @@ func TestContinuedAPMServerFailure(t *testing.T) { assert.Equal(t, string(data), string(bytes)) assert.Equal(t, "gzip", r.Header.Get("Content-Encoding")) if _, err := w.Write([]byte(`{"foo": "bar"}`)); err != nil { - t.Fail() return } })) - apmServer.Close() // Close the APM server early so that POST requests fail and that backoff is enabled + apmServer.Close() config := extensionConfig{ apmServerUrl: apmServer.URL + "/", } - assert.Error(t, PostToApmServer(apmServer.Client(), agentData, &config, context.Background())) - assert.Equal(t, ApmServerTransportState.Status, Failing) - assert.Equal(t, ApmServerTransportState.ReconnectionCount, 1) + transport := InitApmServerTransport(&config) + transport.SetApmServerTransportState(context.Background(), Healthy) + transport.SetApmServerTransportState(context.Background(), Failing) + for { + if transport.status != Failing { + break + } + } + assert.Equal(t, transport.status, Pending) + assert.Error(t, transport.PostToApmServer(context.Background(), agentData)) + assert.Equal(t, transport.status, Failing) + assert.Equal(t, transport.reconnectionCount, 1) } func BenchmarkPostToAPM(b *testing.B) { - // Copied from https://github.com/elastic/apm-server/blob/master/testdata/intake-v2/transactions.ndjson. - benchBody := []byte(`{"metadata": {"service": {"name": "1234_service-12a3","node": {"configured_name": "node-123"},"version": "5.1.3","environment": "staging","language": {"name": "ecmascript","version": "8"},"runtime": {"name": "node","version": "8.0.0"},"framework": {"name": "Express","version": "1.2.3"},"agent": {"name": "elastic-node","version": "3.14.0"}},"user": {"id": "123user", "username": "bar", "email": "bar@user.com"}, "labels": {"tag0": null, "tag1": "one", "tag2": 2}, "process": {"pid": 1234,"ppid": 6789,"title": "node","argv": ["node","server.js"]},"system": {"hostname": "prod1.example.com","architecture": "x64","platform": "darwin", "container": {"id": "container-id"}, "kubernetes": {"namespace": "namespace1", "pod": {"uid": "pod-uid", "name": "pod-name"}, "node": {"name": "node-name"}}},"cloud":{"account":{"id":"account_id","name":"account_name"},"availability_zone":"cloud_availability_zone","instance":{"id":"instance_id","name":"instance_name"},"machine":{"type":"machine_type"},"project":{"id":"project_id","name":"project_name"},"provider":"cloud_provider","region":"cloud_region","service":{"name":"lambda"}}}} -{"transaction": { "id": "945254c567a5417e", "trace_id": "0123456789abcdef0123456789abcdef", "parent_id": "abcdefabcdef01234567", "type": "request", "duration": 32.592981, "span_count": { "started": 43 }}} -{"transaction": {"id": "4340a8e0df1906ecbfa9", "trace_id": "0acd456789abcdef0123456789abcdef", "name": "GET /api/types","type": "request","duration": 32.592981,"outcome":"success", "result": "success", "timestamp": 1496170407154000, "sampled": true, "span_count": {"started": 17},"context": {"service": {"runtime": {"version": "7.0"}},"page":{"referer":"http://localhost:8000/test/e2e/","url":"http://localhost:8000/test/e2e/general-usecase/"}, "request": {"socket": {"remote_address": "12.53.12.1","encrypted": true},"http_version": "1.1","method": "POST","url": {"protocol": "https:","full": "https://www.example.com/p/a/t/h?query=string#hash","hostname": "www.example.com","port": "8080","pathname": "/p/a/t/h","search": "?query=string","hash": "#hash","raw": "/p/a/t/h?query=string#hash"},"headers": {"user-agent":["Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36","Mozilla Chrome Edge"],"content-type": "text/html","cookie": "c1=v1, c2=v2","some-other-header": "foo","array": ["foo","bar","baz"]},"cookies": {"c1": "v1","c2": "v2"},"env": {"SERVER_SOFTWARE": "nginx","GATEWAY_INTERFACE": "CGI/1.1"},"body": {"str": "hello world","additional": { "foo": {},"bar": 123,"req": "additional information"}}},"response": {"status_code": 200,"headers": {"content-type": "application/json"},"headers_sent": true,"finished": true,"transfer_size":25.8,"encoded_body_size":26.90,"decoded_body_size":29.90}, "user": {"domain": "ldap://abc","id": "99","username": "foo"},"tags": {"organization_uuid": "9f0e9d64-c185-4d21-a6f4-4673ed561ec8", "tag2": 12, "tag3": 12.45, "tag4": false, "tag5": null },"custom": {"my_key": 1,"some_other_value": "foo bar","and_objects": {"foo": ["bar","baz"]},"(": "not a valid regex and that is fine"}}}} -{"transaction": { "id": "cdef4340a8e0df19", "trace_id": "0acd456789abcdef0123456789abcdef", "type": "request", "duration": 13.980558, "timestamp": 1532976822281000, "sampled": null, "span_count": { "dropped": 55, "started": 436 }, "marks": {"navigationTiming": {"appBeforeBootstrap": 608.9300000000001,"navigationStart": -21},"another_mark": {"some_long": 10,"some_float": 10.0}, "performance": {}}, "context": { "request": { "socket": { "remote_address": "192.0.1", "encrypted": null }, "method": "POST", "headers": { "user-agent": null, "content-type": null, "cookie": null }, "url": { "protocol": null, "full": null, "hostname": null, "port": null, "pathname": null, "search": null, "hash": null, "raw": null } }, "response": { "headers": { "content-type": null } }, "service": {"environment":"testing","name": "service1","node": {"configured_name": "node-ABC"}, "language": {"version": "2.5", "name": "ruby"}, "agent": {"version": "2.2", "name": "elastic-ruby", "ephemeral_id": "justanid"}, "framework": {"version": "5.0", "name": "Rails"}, "version": "2", "runtime": {"version": "2.5", "name": "cruby"}}},"experience":{"cls":1,"fid":2.0,"tbt":3.4,"longtask":{"count":3,"sum":2.5,"max":1}}}} -{"transaction": { "id": "00xxxxFFaaaa1234", "trace_id": "0123456789abcdef0123456789abcdef", "name": "amqp receive", "parent_id": "abcdefabcdef01234567", "type": "messaging", "duration": 3, "span_count": { "started": 1 }, "context": {"message": {"queue": { "name": "new_users"}, "age":{ "ms": 1577958057123}, "headers": {"user_id": "1ax3", "involved_services": ["user", "auth"]}, "body": "user created", "routing_key": "user-created-transaction"}},"session":{"id":"sunday","sequence":123}}} -{"transaction": { "name": "july-2021-delete-after-july-31", "type": "lambda", "result": "success", "id": "142e61450efb8574", "trace_id": "eb56529a1f461c5e7e2f66ecb075e983", "subtype": null, "action": null, "duration": 38.853, "timestamp": 1631736666365048, "sampled": true, "context": { "cloud": { "origin": { "account": { "id": "abc123" }, "provider": "aws", "region": "us-east-1", "service": { "name": "serviceName" } } }, "service": { "origin": { "id": "abc123", "name": "service-name", "version": "1.0" } }, "user": {}, "tags": {}, "custom": { } }, "sync": true, "span_count": { "started": 0 }, "outcome": "unknown", "faas": { "coldstart": false, "execution": "2e13b309-23e1-417f-8bf7-074fc96bc683", "trigger": { "request_id": "FuH2Cir_vHcEMUA=", "type": "http" } }, "sample_rate": 1 } } -`) - agentData := AgentData{Data: benchBody, ContentEncoding: ""} // Create apm server and handler apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -428,12 +429,21 @@ func BenchmarkPostToAPM(b *testing.B) { apmServerUrl: apmServer.URL + "/", } - client := &http.Client{ - Transport: http.DefaultTransport.(*http.Transport).Clone(), - } + transport := InitApmServerTransport(&config) + + // Copied from https://github.com/elastic/apm-server/blob/master/testdata/intake-v2/transactions.ndjson. + benchBody := []byte(`{"metadata": {"service": {"name": "1234_service-12a3","node": {"configured_name": "node-123"},"version": "5.1.3","environment": "staging","language": {"name": "ecmascript","version": "8"},"runtime": {"name": "node","version": "8.0.0"},"framework": {"name": "Express","version": "1.2.3"},"agent": {"name": "elastic-node","version": "3.14.0"}},"user": {"id": "123user", "username": "bar", "email": "bar@user.com"}, "labels": {"tag0": null, "tag1": "one", "tag2": 2}, "process": {"pid": 1234,"ppid": 6789,"title": "node","argv": ["node","server.js"]},"system": {"hostname": "prod1.example.com","architecture": "x64","platform": "darwin", "container": {"id": "container-id"}, "kubernetes": {"namespace": "namespace1", "pod": {"uid": "pod-uid", "name": "pod-name"}, "node": {"name": "node-name"}}},"cloud":{"account":{"id":"account_id","name":"account_name"},"availability_zone":"cloud_availability_zone","instance":{"id":"instance_id","name":"instance_name"},"machine":{"type":"machine_type"},"project":{"id":"project_id","name":"project_name"},"provider":"cloud_provider","region":"cloud_region","service":{"name":"lambda"}}}} +{"transaction": { "id": "945254c567a5417e", "trace_id": "0123456789abcdef0123456789abcdef", "parent_id": "abcdefabcdef01234567", "type": "request", "duration": 32.592981, "span_count": { "started": 43 }}} +{"transaction": {"id": "4340a8e0df1906ecbfa9", "trace_id": "0acd456789abcdef0123456789abcdef", "name": "GET /api/types","type": "request","duration": 32.592981,"outcome":"success", "result": "success", "timestamp": 1496170407154000, "sampled": true, "span_count": {"started": 17},"context": {"service": {"runtime": {"version": "7.0"}},"page":{"referer":"http://localhost:8000/test/e2e/","url":"http://localhost:8000/test/e2e/general-usecase/"}, "request": {"socket": {"remote_address": "12.53.12.1","encrypted": true},"http_version": "1.1","method": "POST","url": {"protocol": "https:","full": "https://www.example.com/p/a/t/h?query=string#hash","hostname": "www.example.com","port": "8080","pathname": "/p/a/t/h","search": "?query=string","hash": "#hash","raw": "/p/a/t/h?query=string#hash"},"headers": {"user-agent":["Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36","Mozilla Chrome Edge"],"content-type": "text/html","cookie": "c1=v1, c2=v2","some-other-header": "foo","array": ["foo","bar","baz"]},"cookies": {"c1": "v1","c2": "v2"},"env": {"SERVER_SOFTWARE": "nginx","GATEWAY_INTERFACE": "CGI/1.1"},"body": {"str": "hello world","additional": { "foo": {},"bar": 123,"req": "additional information"}}},"response": {"status_code": 200,"headers": {"content-type": "application/json"},"headers_sent": true,"finished": true,"transfer_size":25.8,"encoded_body_size":26.90,"decoded_body_size":29.90}, "user": {"domain": "ldap://abc","id": "99","username": "foo"},"tags": {"organization_uuid": "9f0e9d64-c185-4d21-a6f4-4673ed561ec8", "tag2": 12, "tag3": 12.45, "tag4": false, "tag5": null },"custom": {"my_key": 1,"some_other_value": "foo bar","and_objects": {"foo": ["bar","baz"]},"(": "not a valid regex and that is fine"}}}} +{"transaction": { "id": "cdef4340a8e0df19", "trace_id": "0acd456789abcdef0123456789abcdef", "type": "request", "duration": 13.980558, "timestamp": 1532976822281000, "sampled": null, "span_count": { "dropped": 55, "started": 436 }, "marks": {"navigationTiming": {"appBeforeBootstrap": 608.9300000000001,"navigationStart": -21},"another_mark": {"some_long": 10,"some_float": 10.0}, "performance": {}}, "context": { "request": { "socket": { "remote_address": "192.0.1", "encrypted": null }, "method": "POST", "headers": { "user-agent": null, "content-type": null, "cookie": null }, "url": { "protocol": null, "full": null, "hostname": null, "port": null, "pathname": null, "search": null, "hash": null, "raw": null } }, "response": { "headers": { "content-type": null } }, "service": {"environment":"testing","name": "service1","node": {"configured_name": "node-ABC"}, "language": {"version": "2.5", "name": "ruby"}, "agent": {"version": "2.2", "name": "elastic-ruby", "ephemeral_id": "justanid"}, "framework": {"version": "5.0", "name": "Rails"}, "version": "2", "runtime": {"version": "2.5", "name": "cruby"}}},"experience":{"cls":1,"fid":2.0,"tbt":3.4,"longtask":{"count":3,"sum":2.5,"max":1}}}} +{"transaction": { "id": "00xxxxFFaaaa1234", "trace_id": "0123456789abcdef0123456789abcdef", "name": "amqp receive", "parent_id": "abcdefabcdef01234567", "type": "messaging", "duration": 3, "span_count": { "started": 1 }, "context": {"message": {"queue": { "name": "new_users"}, "age":{ "ms": 1577958057123}, "headers": {"user_id": "1ax3", "involved_services": ["user", "auth"]}, "body": "user created", "routing_key": "user-created-transaction"}},"session":{"id":"sunday","sequence":123}}} +{"transaction": { "name": "july-2021-delete-after-july-31", "type": "lambda", "result": "success", "id": "142e61450efb8574", "trace_id": "eb56529a1f461c5e7e2f66ecb075e983", "subtype": null, "action": null, "duration": 38.853, "timestamp": 1631736666365048, "sampled": true, "context": { "cloud": { "origin": { "account": { "id": "abc123" }, "provider": "aws", "region": "us-east-1", "service": { "name": "serviceName" } } }, "service": { "origin": { "id": "abc123", "name": "service-name", "version": "1.0" } }, "user": {}, "tags": {}, "custom": { } }, "sync": true, "span_count": { "started": 0 }, "outcome": "unknown", "faas": { "coldstart": false, "execution": "2e13b309-23e1-417f-8bf7-074fc96bc683", "trigger": { "request_id": "FuH2Cir_vHcEMUA=", "type": "http" } }, "sample_rate": 1 } } +`) + agentData := AgentData{Data: benchBody, ContentEncoding: ""} + b.ResetTimer() for i := 0; i < b.N; i++ { - if err := PostToApmServer(client, agentData, &config, context.Background()); err != nil { + if err := transport.PostToApmServer(context.Background(), agentData); err != nil { b.Fatal(err) } } diff --git a/apm-lambda-extension/extension/client.go b/apm-lambda-extension/extension/client.go index 1ce3354f..fd8d41d5 100644 --- a/apm-lambda-extension/extension/client.go +++ b/apm-lambda-extension/extension/client.go @@ -67,14 +67,14 @@ const ( extensionErrorType = "Lambda-Extension-Function-Error-Type" ) -// Client is a simple client for the Lambda Extensions API +// Client is a simple Client for the Lambda Extensions API type Client struct { baseURL string httpClient *http.Client ExtensionID string } -// NewClient returns a Lambda Extensions API client +// NewClient returns a Lambda Extensions API Client func NewClient(awsLambdaRuntimeAPI string) *Client { baseURL := fmt.Sprintf("http://%s/2020-01-01/extension", awsLambdaRuntimeAPI) return &Client{ diff --git a/apm-lambda-extension/extension/http_server.go b/apm-lambda-extension/extension/http_server.go index 3415994b..6794f5ff 100644 --- a/apm-lambda-extension/extension/http_server.go +++ b/apm-lambda-extension/extension/http_server.go @@ -24,36 +24,34 @@ import ( "time" ) -var agentDataServer *http.Server - // StartHttpServer starts the server listening for APM agent data. -func StartHttpServer(ctx context.Context, agentDataChan chan AgentData, config *extensionConfig) (err error) { +func StartHttpServer(ctx context.Context, transport *ApmServerTransport) (agentDataServer *http.Server, err error) { mux := http.NewServeMux() - mux.HandleFunc("/", handleInfoRequest(ctx, config.apmServerUrl, config)) - mux.HandleFunc("/intake/v2/events", handleIntakeV2Events(agentDataChan)) - timeout := time.Duration(config.dataReceiverTimeoutSeconds) * time.Second - agentDataServer = &http.Server{ - Addr: config.dataReceiverServerPort, + mux.HandleFunc("/", handleInfoRequest(ctx, transport)) + mux.HandleFunc("/intake/v2/events", handleIntakeV2Events(transport)) + timeout := time.Duration(transport.config.dataReceiverTimeoutSeconds) * time.Second + server := &http.Server{ + Addr: transport.config.dataReceiverServerPort, Handler: mux, ReadTimeout: timeout, WriteTimeout: timeout, MaxHeaderBytes: 1 << 20, } - ln, err := net.Listen("tcp", agentDataServer.Addr) + ln, err := net.Listen("tcp", server.Addr) if err != nil { return } go func() { - Log.Infof("Extension listening for apm data on %s", agentDataServer.Addr) - if err = agentDataServer.Serve(ln); err != nil { - if err.Error() == "http: Server closed" { + Log.Infof("Extension listening for apm data on %s", server.Addr) + if err = server.Serve(ln); err != nil { + if err.Error() == "http: server closed" { Log.Debug(err) } else { Log.Errorf("Error upon APM data server start : %v", err) } } }() - return nil + return server, nil } diff --git a/apm-lambda-extension/extension/http_server_test.go b/apm-lambda-extension/extension/http_server_test.go index 4c08cfec..190a65fe 100644 --- a/apm-lambda-extension/extension/http_server_test.go +++ b/apm-lambda-extension/extension/http_server_test.go @@ -51,7 +51,6 @@ func TestInfoProxy(t *testing.T) { defer apmServer.Close() // Create extension config and start the server - dataChannel := make(chan AgentData, 100) config := extensionConfig{ apmServerUrl: apmServer.URL, apmServerSecretToken: "foo", @@ -59,8 +58,9 @@ func TestInfoProxy(t *testing.T) { dataReceiverServerPort: ":1234", dataReceiverTimeoutSeconds: 15, } - - if err := StartHttpServer(context.Background(), dataChannel, &config); err != nil { + transport := InitApmServerTransport(&config) + agentDataServer, err := StartHttpServer(context.Background(), transport) + if err != nil { t.Fail() return } @@ -100,7 +100,6 @@ func TestInfoProxyErrorStatusCode(t *testing.T) { defer apmServer.Close() // Create extension config and start the server - dataChannel := make(chan AgentData, 100) config := extensionConfig{ apmServerUrl: apmServer.URL, apmServerSecretToken: "foo", @@ -108,8 +107,10 @@ func TestInfoProxyErrorStatusCode(t *testing.T) { dataReceiverServerPort: ":1234", dataReceiverTimeoutSeconds: 15, } + transport := InitApmServerTransport(&config) - if err := StartHttpServer(context.Background(), dataChannel, &config); err != nil { + agentDataServer, err := StartHttpServer(context.Background(), transport) + if err != nil { t.Fail() return } @@ -144,16 +145,17 @@ func Test_handleInfoRequest(t *testing.T) { ` // Create extension config - dataChannel := make(chan AgentData, 100) config := extensionConfig{ apmServerSecretToken: "foo", apmServerApiKey: "bar", dataReceiverServerPort: ":1234", dataReceiverTimeoutSeconds: 15, } + transport := InitApmServerTransport(&config) // Start extension server - if err := StartHttpServer(context.Background(), dataChannel, &config); err != nil { + agentDataServer, err := StartHttpServer(context.Background(), transport) + if err != nil { t.Fail() return } @@ -189,10 +191,10 @@ func (errReader) Read(_ []byte) (int, error) { } func Test_handleInfoRequestInvalidBody(t *testing.T) { - testChan := make(chan AgentData) + transport := InitApmServerTransport(&extensionConfig{}) mux := http.NewServeMux() urlPath := "/intake/v2/events" - mux.HandleFunc(urlPath, handleIntakeV2Events(testChan)) + mux.HandleFunc(urlPath, handleIntakeV2Events(transport)) req := httptest.NewRequest(http.MethodGet, urlPath, errReader(0)) recorder := httptest.NewRecorder() @@ -203,22 +205,21 @@ func Test_handleInfoRequestInvalidBody(t *testing.T) { func Test_handleIntakeV2EventsQueryParam(t *testing.T) { body := []byte(`{"metadata": {}`) - AgentDoneSignal = make(chan struct{}) - // Create apm server and handler apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { })) defer apmServer.Close() // Create extension config and start the server - dataChannel := make(chan AgentData, 100) config := extensionConfig{ apmServerUrl: apmServer.URL, dataReceiverServerPort: ":1234", dataReceiverTimeoutSeconds: 15, } - - if err := StartHttpServer(context.Background(), dataChannel, &config); err != nil { + transport := InitApmServerTransport(&config) + transport.AgentDoneSignal = make(chan struct{}, 1) + agentDataServer, err := StartHttpServer(context.Background(), transport) + if err != nil { t.Fail() return } @@ -246,8 +247,8 @@ func Test_handleIntakeV2EventsQueryParam(t *testing.T) { defer timer.Stop() select { - case <-AgentDoneSignal: - <-dataChannel + case <-transport.AgentDoneSignal: + <-transport.dataChannel case <-timer.C: t.Log("Timed out waiting for server to send FuncDone signal") t.Fail() @@ -263,14 +264,15 @@ func Test_handleIntakeV2EventsNoQueryParam(t *testing.T) { defer apmServer.Close() // Create extension config and start the server - dataChannel := make(chan AgentData, 100) config := extensionConfig{ apmServerUrl: apmServer.URL, dataReceiverServerPort: ":1234", dataReceiverTimeoutSeconds: 15, } - - if err := StartHttpServer(context.Background(), dataChannel, &config); err != nil { + transport := InitApmServerTransport(&config) + transport.AgentDoneSignal = make(chan struct{}, 1) + agentDataServer, err := StartHttpServer(context.Background(), transport) + if err != nil { t.Fail() return } @@ -292,29 +294,28 @@ func Test_handleIntakeV2EventsNoQueryParam(t *testing.T) { t.Logf("Error fetching %s, [%v]", agentDataServer.Addr, err) t.Fail() } - <-dataChannel + <-transport.dataChannel assert.Equal(t, 202, resp.StatusCode) } func Test_handleIntakeV2EventsQueryParamEmptyData(t *testing.T) { body := []byte(``) - AgentDoneSignal = make(chan struct{}) - // Create apm server and handler apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { })) defer apmServer.Close() // Create extension config and start the server - dataChannel := make(chan AgentData, 100) config := extensionConfig{ apmServerUrl: apmServer.URL, dataReceiverServerPort: ":1234", dataReceiverTimeoutSeconds: 15, } - - if err := StartHttpServer(context.Background(), dataChannel, &config); err != nil { + transport := InitApmServerTransport(&config) + transport.AgentDoneSignal = make(chan struct{}, 1) + agentDataServer, err := StartHttpServer(context.Background(), transport) + if err != nil { t.Fail() return } @@ -342,7 +343,7 @@ func Test_handleIntakeV2EventsQueryParamEmptyData(t *testing.T) { defer timer.Stop() select { - case <-AgentDoneSignal: + case <-transport.AgentDoneSignal: case <-timer.C: t.Log("Timed out waiting for server to send FuncDone signal") t.Fail() diff --git a/apm-lambda-extension/extension/route_handlers.go b/apm-lambda-extension/extension/route_handlers.go index d813f967..ab7d9f0f 100644 --- a/apm-lambda-extension/extension/route_handlers.go +++ b/apm-lambda-extension/extension/route_handlers.go @@ -31,18 +31,14 @@ type AgentData struct { ContentEncoding string } -var AgentDoneSignal chan struct{} -var mainExtensionContext context.Context - // URL: http://server/ -func handleInfoRequest(ctx context.Context, apmServerUrl string, config *extensionConfig) func(w http.ResponseWriter, r *http.Request) { +func handleInfoRequest(ctx context.Context, apmServerTransport *ApmServerTransport) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { - Log.Debug("Handling APM Server Info Request") - mainExtensionContext = ctx + Log.Debug("Handling APM server Info Request") // Init reverse proxy - parsedApmServerUrl, err := url.Parse(apmServerUrl) + parsedApmServerUrl, err := url.Parse(apmServerTransport.config.apmServerUrl) if err != nil { Log.Errorf("could not parse APM server URL: %v", err) return @@ -50,12 +46,15 @@ func handleInfoRequest(ctx context.Context, apmServerUrl string, config *extensi reverseProxy := httputil.NewSingleHostReverseProxy(parsedApmServerUrl) - reverseProxyTimeout := time.Duration(config.DataForwarderTimeoutSeconds) * time.Second + reverseProxyTimeout := time.Duration(apmServerTransport.config.DataForwarderTimeoutSeconds) * time.Second customTransport := http.DefaultTransport.(*http.Transport).Clone() customTransport.ResponseHeaderTimeout = reverseProxyTimeout reverseProxy.Transport = customTransport - reverseProxy.ErrorHandler = reverseProxyErrorHandler + reverseProxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) { + apmServerTransport.SetApmServerTransportState(ctx, Failing) + Log.Errorf("Error querying version from the APM server: %v", err) + } // Process request (the Golang doc suggests removing any pre-existing X-Forwarded-For header coming // from the client or an untrusted proxy to prevent IP spoofing : https://pkg.go.dev/net/http/httputil#ReverseProxy @@ -72,13 +71,8 @@ func handleInfoRequest(ctx context.Context, apmServerUrl string, config *extensi } } -func reverseProxyErrorHandler(res http.ResponseWriter, req *http.Request, err error) { - SetApmServerTransportState(Failing, mainExtensionContext) - Log.Errorf("Error querying version from the APM Server: %v", err) -} - // URL: http://server/intake/v2/events -func handleIntakeV2Events(agentDataChan chan AgentData) func(w http.ResponseWriter, r *http.Request) { +func handleIntakeV2Events(transport *ApmServerTransport) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { Log.Debug("Handling APM Data Intake") @@ -96,11 +90,11 @@ func handleIntakeV2Events(agentDataChan chan AgentData) func(w http.ResponseWrit ContentEncoding: r.Header.Get("Content-Encoding"), } - EnqueueAPMData(agentDataChan, agentData) + transport.EnqueueAPMData(agentData) } if len(r.URL.Query()["flushed"]) > 0 && r.URL.Query()["flushed"][0] == "true" { - AgentDoneSignal <- struct{}{} + transport.AgentDoneSignal <- struct{}{} } w.WriteHeader(http.StatusAccepted) diff --git a/apm-lambda-extension/extension/process_events.go b/apm-lambda-extension/extension/util.go similarity index 52% rename from apm-lambda-extension/extension/process_events.go rename to apm-lambda-extension/extension/util.go index c47e5644..e2388562 100644 --- a/apm-lambda-extension/extension/process_events.go +++ b/apm-lambda-extension/extension/util.go @@ -18,39 +18,9 @@ package extension import ( - "context" "encoding/json" - "net/http" ) -// ProcessShutdown processes the Shutdown event received from the -// Lambda runtime API. -func ProcessShutdown() { - Log.Info("Received SHUTDOWN event, exiting") - agentDataServer.Close() -} - -// FlushAPMData reads all the apm data in the apm data channel and sends it to the APM server. -func FlushAPMData(client *http.Client, dataChannel chan AgentData, config *extensionConfig, ctx context.Context) { - if !IsTransportStatusHealthyOrPending() { - Log.Debug("Flush skipped - Transport unhealthy") - return - } - Log.Debug("Flush started - Checking for agent data") - for { - select { - case agentData := <-dataChannel: - Log.Debug("Flush in progress - Processing agent data") - if err := PostToApmServer(client, agentData, config, ctx); err != nil { - Log.Errorf("Error sending to APM server, skipping: %v", err) - } - default: - Log.Debug("Flush ended - No agent data on buffer") - return - } - } -} - // PrettyPrint prints formatted, legible json data. func PrettyPrint(v interface{}) string { data, err := json.MarshalIndent(v, "", "\t") diff --git a/apm-lambda-extension/logsapi/route_handlers.go b/apm-lambda-extension/logsapi/route_handlers.go index 31f52cf2..3da4c316 100644 --- a/apm-lambda-extension/logsapi/route_handlers.go +++ b/apm-lambda-extension/logsapi/route_handlers.go @@ -24,7 +24,7 @@ import ( "time" ) -func handleLogEventsRequest(out chan LogEvent) func(w http.ResponseWriter, r *http.Request) { +func handleLogEventsRequest(transport *LogsTransport) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { var logEvents []LogEvent @@ -40,7 +40,7 @@ func handleLogEventsRequest(out chan LogEvent) func(w http.ResponseWriter, r *ht w.WriteHeader(http.StatusInternalServerError) continue } - out <- logEvents[idx] + transport.logsChannel <- logEvents[idx] } } } diff --git a/apm-lambda-extension/logsapi/subscribe.go b/apm-lambda-extension/logsapi/subscribe.go index d1afb82a..3d844bd3 100644 --- a/apm-lambda-extension/logsapi/subscribe.go +++ b/apm-lambda-extension/logsapi/subscribe.go @@ -29,9 +29,23 @@ import ( "github.com/pkg/errors" ) -var ListenerHost = "sandbox" -var Server *http.Server -var Listener net.Listener +// TODO: Remove global variable and find another way to retrieve Logs Listener network info when testing main +// TestListenerAddr For e2e testing purposes +var TestListenerAddr net.Addr + +type LogsTransport struct { + logsChannel chan LogEvent + listener net.Listener + listenerHost string + server *http.Server +} + +func InitLogsTransport(listenerHost string) *LogsTransport { + var transport LogsTransport + transport.listenerHost = listenerHost + transport.logsChannel = make(chan LogEvent, 100) + return &transport +} // LogEvent represents an event received from the Logs API type LogEvent struct { @@ -48,7 +62,8 @@ type LogEventRecord struct { } // Subscribes to the Logs API -func subscribe(extensionID string, eventTypes []EventType) error { +func subscribe(transport *LogsTransport, extensionID string, eventTypes []EventType) error { + extensionsAPIAddress, ok := os.LookupEnv("AWS_LAMBDA_RUNTIME_API") if !ok { return errors.New("AWS_LAMBDA_RUNTIME_API is not set") @@ -60,48 +75,66 @@ func subscribe(extensionID string, eventTypes []EventType) error { return err } - _, port, _ := net.SplitHostPort(Listener.Addr().String()) - _, err = logsAPIClient.Subscribe(eventTypes, URI("http://"+ListenerHost+":"+port), extensionID) + _, port, _ := net.SplitHostPort(transport.listener.Addr().String()) + _, err = logsAPIClient.Subscribe(eventTypes, URI("http://"+transport.listenerHost+":"+port), extensionID) return err } // Subscribe starts the HTTP server listening for log events and subscribes to the Logs API -func Subscribe(ctx context.Context, extensionID string, eventTypes []EventType, out chan LogEvent) (err error) { +func Subscribe(ctx context.Context, extensionID string, eventTypes []EventType) (transport *LogsTransport, err error) { if checkAWSSamLocal() { - return errors.New("Detected sam local environment") + return nil, errors.New("Detected sam local environment") } - if err = startHTTPServer(out); err != nil { - return + + // Init APM server Transport struct + // Make channel for collecting logs and create a HTTP server to listen for them + if checkLambdaFunction() { + transport = InitLogsTransport("sandbox") + } else { + transport = InitLogsTransport("localhost") } - if err = subscribe(extensionID, eventTypes); err != nil { - return + if err = startHTTPServer(ctx, transport); err != nil { + return nil, err } - return nil + + if err = subscribe(transport, extensionID, eventTypes); err != nil { + return nil, err + } + return transport, nil } -func startHTTPServer(out chan LogEvent) error { +func startHTTPServer(ctx context.Context, transport *LogsTransport) error { mux := http.NewServeMux() - mux.HandleFunc("/", handleLogEventsRequest(out)) + mux.HandleFunc("/", handleLogEventsRequest(transport)) var err error - Server = &http.Server{ + transport.server = &http.Server{ Handler: mux, } - if Listener, err = net.Listen("tcp", ListenerHost+":0"); err != nil { + if transport.listener, err = net.Listen("tcp", transport.listenerHost+":0"); err != nil { return err } + TestListenerAddr = transport.listener.Addr() go func() { - extension.Log.Infof("Extension listening for Lambda Logs API events on %s", Listener.Addr().String()) - if err = Server.Serve(Listener); err != nil { + extension.Log.Infof("Extension listening for Lambda Logs API events on %s", transport.listener.Addr().String()) + if err = transport.server.Serve(transport.listener); err != nil { extension.Log.Errorf("Error upon Logs API server start : %v", err) } }() + + go func() { + <-ctx.Done() + transport.server.Close() + }() + return nil } +// checkAWSSamLocal checks if the extension is running in a SAM CLI container. +// The Logs API is not supported in that scenario. func checkAWSSamLocal() bool { envAWSLocal, ok := os.LookupEnv("AWS_SAM_LOCAL") if ok && envAWSLocal == "true" { @@ -110,3 +143,40 @@ func checkAWSSamLocal() bool { return false } + +// checkLambdaFunction checks if the extension is together with an actual Lambda function. +// It is currently used together with checkAWSSamLocal as a best effort solution to determine if +// the extension actually runs in dev (unit tests), SAM, or in a local Lambda environment. +func checkLambdaFunction() bool { + lambdaName, ok := os.LookupEnv("AWS_LAMBDA_FUNCTION_NAME") + if ok && lambdaName != "" { + return true + } + + return false +} + +// WaitRuntimeDone consumes events until a RuntimeDone event corresponding +// to requestID is received, or ctx is cancelled, and then returns. +func WaitRuntimeDone(ctx context.Context, requestID string, transport *LogsTransport, runtimeDoneSignal chan struct{}) error { + for { + select { + case logEvent := <-transport.logsChannel: + extension.Log.Debugf("Received log event %v", logEvent.Type) + // Check the logEvent for runtimeDone and compare the RequestID + // to the id that came in via the Next API + if logEvent.Type == RuntimeDone { + if logEvent.Record.RequestId == requestID { + extension.Log.Info("Received runtimeDone event for this function invocation") + runtimeDoneSignal <- struct{}{} + return nil + } else { + extension.Log.Debug("Log API runtimeDone event request id didn't match") + } + } + case <-ctx.Done(): + extension.Log.Debug("Current invocation over. Interrupting logs processing goroutine") + return nil + } + } +} diff --git a/apm-lambda-extension/logsapi/subscribe_test.go b/apm-lambda-extension/logsapi/subscribe_test.go index 8c8f4f0c..df1494fe 100644 --- a/apm-lambda-extension/logsapi/subscribe_test.go +++ b/apm-lambda-extension/logsapi/subscribe_test.go @@ -31,8 +31,6 @@ import ( ) func TestSubscribeWithSamLocalEnv(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() if err := os.Setenv("AWS_SAM_LOCAL", "true"); err != nil { t.Fail() } @@ -41,17 +39,27 @@ func TestSubscribeWithSamLocalEnv(t *testing.T) { t.Fail() } }) - out := make(chan LogEvent) - err := Subscribe(ctx, "testID", []EventType{Platform}, out) + _, err := Subscribe(context.Background(), "testID", []EventType{Platform}) assert.Error(t, err) } +func TestSubscribeWithLambdaFunction(t *testing.T) { + if err := os.Setenv("AWS_LAMBDA_FUNCTION_NAME", "mock"); err != nil { + t.Fail() + } + t.Cleanup(func() { + if err := os.Unsetenv("AWS_LAMBDA_FUNCTION_NAME"); err != nil { + t.Fail() + } + }) + + _, err := Subscribe(context.Background(), "testID", []EventType{Platform}) + assert.Error(t, err, "listen tcp: lookup sandbox: no such host") +} + func TestSubscribeAWSRequest(t *testing.T) { - ListenerHost = "localhost" - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - out := make(chan LogEvent, 1) + // For subscription request expectedTypes := []EventType{Platform} expectedBufferingCfg := BufferingCfg{ @@ -77,12 +85,13 @@ func TestSubscribeAWSRequest(t *testing.T) { } // Subscribe to the logs api and start the http server listening for events - if err := Subscribe(ctx, "testID", []EventType{Platform}, out); err != nil { + transport, err := Subscribe(context.Background(), "testID", []EventType{Platform}) + if err != nil { t.Logf("Error subscribing, %v", err) t.Fail() return } - defer Server.Close() + defer transport.server.Close() // Create a request to send to the logs listener platformDoneEvent := `{ @@ -94,7 +103,7 @@ func TestSubscribeAWSRequest(t *testing.T) { } }` body := []byte(`[` + platformDoneEvent + `]`) - url := "http://" + Listener.Addr().String() + url := "http://" + transport.listener.Addr().String() req, err := http.NewRequest("GET", url, bytes.NewReader(body)) if err != nil { t.Log("Could not create request") @@ -106,16 +115,11 @@ func TestSubscribeAWSRequest(t *testing.T) { t.Logf("Error fetching %s, [%v]", url, err) t.Fail() } - event := <-out + event := <-transport.logsChannel assert.Equal(t, event.Record.RequestId, "6f7f0961f83442118a7af6fe80b88") } func TestSubscribeWithBadLogsRequest(t *testing.T) { - ListenerHost = "localhost" - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - out := make(chan LogEvent) - // Create aws runtime API server and handler awsRuntimeApiServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) defer awsRuntimeApiServer.Close() @@ -127,17 +131,18 @@ func TestSubscribeWithBadLogsRequest(t *testing.T) { } // Subscribe to the logs api and start the http server listening for events - if err := Subscribe(ctx, "testID", []EventType{Platform}, out); err != nil { + transport, err := Subscribe(context.Background(), "testID", []EventType{Platform}) + if err != nil { t.Logf("Error subscribing, %v", err) t.Fail() return } - defer Server.Close() + defer transport.server.Close() // Create a request to send to the logs listener logEvent := `{"invalid": "json"}` body := []byte(`[` + logEvent + `]`) - url := "http://" + Listener.Addr().String() + url := "http://" + transport.listener.Addr().String() req, err := http.NewRequest("GET", url, bytes.NewReader(body)) if err != nil { t.Log("Could not create request") diff --git a/apm-lambda-extension/main.go b/apm-lambda-extension/main.go index d5da6429..5e7d9f58 100644 --- a/apm-lambda-extension/main.go +++ b/apm-lambda-extension/main.go @@ -19,7 +19,6 @@ package main import ( "context" - "net/http" "os" "os/signal" "path/filepath" @@ -40,9 +39,8 @@ var ( func main() { + // Global context ctx, cancel := context.WithCancel(context.Background()) - - // Trigger ctx.Done() in all relevant goroutines when main ends defer cancel() sigs := make(chan os.Signal, 1) @@ -71,35 +69,19 @@ func main() { } extension.Log.Debugf("Register response: %v", extension.PrettyPrint(res)) - // Create a channel to buffer apm agent data - agentDataChannel := make(chan extension.AgentData, 100) - - // Start http server to receive data from agent - if err = extension.StartHttpServer(ctx, agentDataChannel, config); err != nil { + // Init APM Server Transport struct and start http server to receive data from agent + apmServerTransport := extension.InitApmServerTransport(config) + agentDataServer, err := extension.StartHttpServer(ctx, apmServerTransport) + if err != nil { extension.Log.Errorf("Could not start APM data receiver : %v", err) } - - // Create a client to use for sending data to the apm server - client := &http.Client{ - Timeout: time.Duration(config.DataForwarderTimeoutSeconds) * time.Second, - Transport: http.DefaultTransport.(*http.Transport).Clone(), - } - - // Create a channel used - - // Make channel for collecting logs and create a HTTP server to listen for them - logsChannel := make(chan logsapi.LogEvent) + defer agentDataServer.Close() // Use a wait group to ensure the background go routine sending to the APM server // completes before signaling that the extension is ready for the next invocation. - var backgroundDataSendWg sync.WaitGroup - - if err := logsapi.Subscribe( - ctx, - extensionClient.ExtensionID, - []logsapi.EventType{logsapi.Platform}, - logsChannel, - ); err != nil { + + logsTransport, err := logsapi.Subscribe(ctx, extensionClient.ExtensionID, []logsapi.EventType{logsapi.Platform}) + if err != nil { extension.Log.Warnf("Error while subscribing to the Logs API: %v", err) } @@ -108,109 +90,93 @@ func main() { case <-ctx.Done(): return default: - // call Next method of extension API. This long polling HTTP method - // will block until there's an invocation of the function - extension.Log.Infof("Waiting for next event...") - event, err := extensionClient.NextEvent(ctx) - if err != nil { - status, err := extensionClient.ExitError(ctx, err.Error()) - if err != nil { - panic(err) - } - extension.Log.Errorf("Error: %s", err) - extension.Log.Infof("Exit signal sent to runtime : %s", status) - extension.Log.Infof("Exiting") - return - } - extension.Log.Debug("Received event.") - extension.Log.Debugf("%v", extension.PrettyPrint(event)) - - // Make a channel for signaling that we received the agent flushed signal - extension.AgentDoneSignal = make(chan struct{}) - // Make a channel for signaling that we received the runtimeDone logs API event - runtimeDoneSignal := make(chan struct{}) - // Make a channel for signaling that the function invocation is complete - funcDone := make(chan struct{}) - - // A shutdown event indicates the execution environment is shutting down. - // This is usually due to inactivity. - if event.EventType == extension.Shutdown { - extension.ProcessShutdown() - cancel() - return - } - - // Receive agent data as it comes in and post it to the APM server. - // Stop checking for, and sending agent data when the function invocation - // has completed, signaled via a channel. - backgroundDataSendWg.Add(1) - go func() { - defer backgroundDataSendWg.Done() - if !extension.IsTransportStatusHealthyOrPending() { - return - } - for { - select { - case <-funcDone: - extension.Log.Debug("Received signal that function has completed, not processing any more agent data") - return - case agentData := <-agentDataChannel: - if err := extension.PostToApmServer(client, agentData, config, ctx); err != nil { - extension.Log.Errorf("Error sending to APM server, skipping: %v", err) - return - } - } - } - }() - - // Receive Logs API events - // Send to the runtimeDoneSignal channel to signal when a runtimeDone event is received - go func() { - for { - select { - case <-funcDone: - extension.Log.Debug("Received signal that function has completed, not processing any more log events") - return - case logEvent := <-logsChannel: - extension.Log.Debugf("Received log event %v", logEvent.Type) - // Check the logEvent for runtimeDone and compare the RequestID - // to the id that came in via the Next API - if logEvent.Type == logsapi.RuntimeDone { - if logEvent.Record.RequestId == event.RequestID { - extension.Log.Info("Received runtimeDone event for this function invocation") - runtimeDoneSignal <- struct{}{} - return - } else { - extension.Log.Debug("Log API runtimeDone event request id didn't match") - } - } - } - } - }() - - // Calculate how long to wait for a runtimeDoneSignal or AgentDoneSignal signal - flushDeadlineMs := event.DeadlineMs - 100 - durationUntilFlushDeadline := time.Until(time.Unix(flushDeadlineMs/1000, 0)) - - // Create a timer that expires after durationUntilFlushDeadline - timer := time.NewTimer(durationUntilFlushDeadline) - defer timer.Stop() - - select { - case <-extension.AgentDoneSignal: - extension.Log.Debug("Received agent done signal") - case <-runtimeDoneSignal: - extension.Log.Debug("Received runtimeDone signal") - case <-timer.C: - extension.Log.Info("Time expired waiting for agent signal or runtimeDone event") - } - - close(funcDone) + var backgroundDataSendWg sync.WaitGroup + processEvent(ctx, cancel, apmServerTransport, logsTransport, &backgroundDataSendWg) + extension.Log.Debug("Waiting for background data send to end") backgroundDataSendWg.Wait() if config.SendStrategy == extension.SyncFlush { // Flush APM data now that the function invocation has completed - extension.FlushAPMData(client, agentDataChannel, config, ctx) + apmServerTransport.FlushAPMData(ctx) } } } } + +func processEvent(ctx context.Context, cancel context.CancelFunc, apmServerTransport *extension.ApmServerTransport, logsTransport *logsapi.LogsTransport, backgroundDataSendWg *sync.WaitGroup) { + // Invocation context + invocationCtx, invocationCancel := context.WithCancel(ctx) + defer invocationCancel() + + // call Next method of extension API. This long polling HTTP method + // will block until there's an invocation of the function + extension.Log.Infof("Waiting for next event...") + event, err := extensionClient.NextEvent(ctx) + if err != nil { + status, err := extensionClient.ExitError(ctx, err.Error()) + if err != nil { + panic(err) + } + extension.Log.Errorf("Error: %s", err) + extension.Log.Infof("Exit signal sent to runtime : %s", status) + extension.Log.Infof("Exiting") + return + } + + extension.Log.Debug("Received event.") + extension.Log.Debugf("%v", extension.PrettyPrint(event)) + + if event.EventType == extension.Shutdown { + cancel() + return + } + + // APM Data Processing + apmServerTransport.AgentDoneSignal = make(chan struct{}) + defer close(apmServerTransport.AgentDoneSignal) + backgroundDataSendWg.Add(1) + go func() { + defer backgroundDataSendWg.Done() + if err := apmServerTransport.ForwardApmData(invocationCtx); err != nil { + extension.Log.Error(err) + } + }() + + // Lambda Service Logs Processing + // This goroutine should not be started if subscription failed + runtimeDone := make(chan struct{}) + if logsTransport != nil { + go func() { + if err := logsapi.WaitRuntimeDone(invocationCtx, event.RequestID, logsTransport, runtimeDone); err != nil { + extension.Log.Errorf("Error while processing Lambda Logs ; %v", err) + } else { + close(runtimeDone) + } + }() + } else { + extension.Log.Warn("Logs collection not started due to earlier subscription failure") + close(runtimeDone) + } + + // Calculate how long to wait for a runtimeDoneSignal or AgentDoneSignal signal + flushDeadlineMs := event.DeadlineMs - 100 + durationUntilFlushDeadline := time.Until(time.Unix(flushDeadlineMs/1000, 0)) + + // Create a timer that expires after durationUntilFlushDeadline + timer := time.NewTimer(durationUntilFlushDeadline) + defer timer.Stop() + + // The extension relies on 3 independent mechanisms to minimize the time interval between the end of the execution of + // the lambda function and the end of the execution of processEvent() + // 1) AgentDoneSignal is triggered upon reception of a `flushed=true` query from the agent + // 2) [Backup 1] RuntimeDone is triggered upon reception of a Lambda log entry certifying the end of the execution of the current function + // 3) [Backup 2] If all else fails, the extension relies of the timeout of the Lambda function to interrupt itself 100 ms before the specified deadline. + // This time interval is large enough to attempt a last flush attempt (if SendStrategy == syncFlush) before the environment gets shut down. + select { + case <-apmServerTransport.AgentDoneSignal: + extension.Log.Debug("Received agent done signal") + case <-runtimeDone: + extension.Log.Debug("Received runtimeDone signal") + case <-timer.C: + extension.Log.Info("Time expired waiting for agent signal or runtimeDone event") + } +} diff --git a/apm-lambda-extension/main_test.go b/apm-lambda-extension/main_test.go index 9c88c0b2..8355f9f3 100644 --- a/apm-lambda-extension/main_test.go +++ b/apm-lambda-extension/main_test.go @@ -47,6 +47,7 @@ const ( InvokeStandard MockEventType = "Standard" InvokeStandardInfo MockEventType = "StandardInfo" InvokeStandardFlush MockEventType = "Flush" + InvokeLateFlush MockEventType = "LateFlush" InvokeWaitgroupsRace MockEventType = "InvokeWaitgroupsRace" InvokeMultipleTransactionsOverload MockEventType = "MultipleTransactionsOverload" Shutdown MockEventType = "Shutdown" @@ -56,6 +57,7 @@ type MockServerInternals struct { Data string WaitForUnlockSignal bool UnlockSignalChannel chan struct{} + WaitGroup sync.WaitGroup } type APMServerBehavior string @@ -145,7 +147,6 @@ func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest. } // Mock Lambda Server - logsapi.ListenerHost = "localhost" var lambdaServerInternals MockServerInternals lambdaServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch r.RequestURI { @@ -161,6 +162,7 @@ func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest. return } case "/2020-01-01/extension/event/next": + lambdaServerInternals.WaitGroup.Wait() currId := uuid.New().String() select { case nextEvent := <-eventsChannel: @@ -206,6 +208,7 @@ func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest. func processMockEvent(currId string, event MockEvent, extensionPort string, internals *MockServerInternals) { sendLogEvent(currId, "platform.start") client := http.Client{} + sendRuntimeDone := true switch event.Type { case InvokeHang: time.Sleep(time.Duration(event.Timeout) * time.Second) @@ -220,6 +223,16 @@ func processMockEvent(currId string, event MockEvent, extensionPort string, inte if _, err := client.Do(reqData); err != nil { extension.Log.Error(err.Error()) } + case InvokeLateFlush: + time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) + reqData, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events?flushed=true", extensionPort), bytes.NewBuffer([]byte(event.APMServerBehavior))) + internals.WaitGroup.Add(1) + go func() { + if _, err := client.Do(reqData); err != nil { + extension.Log.Error(err.Error()) + } + internals.WaitGroup.Done() + }() case InvokeWaitgroupsRace: time.Sleep(time.Duration(event.ExecutionDuration) * time.Second) reqData0, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), bytes.NewBuffer([]byte(event.APMServerBehavior))) @@ -261,7 +274,9 @@ func processMockEvent(currId string, event MockEvent, extensionPort string, inte extension.Log.Debugf("Response seen by the agent : %d", res.StatusCode) case Shutdown: } - sendLogEvent(currId, "platform.runtimeDone") + if sendRuntimeDone { + sendLogEvent(currId, "platform.runtimeDone") + } } func sendNextEventInfo(w http.ResponseWriter, id string, event MockEvent) { @@ -305,7 +320,7 @@ func sendLogEvent(requestId string, logEventType logsapi.SubEventType) { extension.Log.Errorf("Could not encode record : %v", err) return } - host, port, _ := net.SplitHostPort(logsapi.Listener.Addr().String()) + host, port, _ := net.SplitHostPort(logsapi.TestListenerAddr.String()) req, _ := http.NewRequest("POST", "http://"+host+":"+port, bufLogEvent) client := http.Client{} if _, err := client.Do(req); err != nil { @@ -345,7 +360,6 @@ func (suite *MainUnitTestsSuite) SetupTest() { http.DefaultServeMux = new(http.ServeMux) suite.eventsChannel = make(chan MockEvent, 100) suite.lambdaServer, suite.apmServer, suite.apmServerInternals, suite.lambdaServerInternals = initMockServers(suite.eventsChannel) - extension.SetApmServerTransportState(extension.Healthy, suite.ctx) } // This function executes after each test case @@ -375,6 +389,18 @@ func (suite *MainUnitTestsSuite) TestFlush() { assert.True(suite.T(), strings.Contains(suite.apmServerInternals.Data, string(TimelyResponse))) } +// TestLateFlush checks if there is no race condition between RuntimeDone and AgentDone +// The test is built so that the AgentDone signal is received after RuntimeDone, which causes the next event to be interrupted. +func (suite *MainUnitTestsSuite) TestLateFlush() { + eventsChain := []MockEvent{ + {Type: InvokeLateFlush, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, + {Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5}, + } + eventQueueGenerator(eventsChain, suite.eventsChannel) + assert.NotPanics(suite.T(), main) + assert.True(suite.T(), strings.Contains(suite.apmServerInternals.Data, string(TimelyResponse+TimelyResponse))) +} + // TestWaitGroup checks if there is no race condition between the main waitgroups (issue #128) func (suite *MainUnitTestsSuite) TestWaitGroup() { eventsChain := []MockEvent{ @@ -435,9 +461,6 @@ func (suite *MainUnitTestsSuite) TestAPMServerRecovery() { // TestGracePeriodHangs verifies that the WaitforGracePeriod goroutine ends when main() ends. // This can be checked by asserting that apmTransportStatus is pending after the execution of main. func (suite *MainUnitTestsSuite) TestGracePeriodHangs() { - extension.ApmServerTransportState.Status = extension.Pending - extension.ApmServerTransportState.ReconnectionCount = 100 - eventsChain := []MockEvent{ {Type: InvokeStandard, APMServerBehavior: Hangs, ExecutionDuration: 1, Timeout: 500}, } @@ -446,7 +469,6 @@ func (suite *MainUnitTestsSuite) TestGracePeriodHangs() { time.Sleep(100 * time.Millisecond) suite.apmServerInternals.UnlockSignalChannel <- struct{}{} - defer assert.Equal(suite.T(), extension.IsTransportStatusHealthyOrPending(), true) } // TestAPMServerCrashesDuringExecution tests that main does not panic nor runs indefinitely when the APM server crashes