Skip to content

Commit

Permalink
Refactor unit tests with cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
jlvoiseux committed May 4, 2022
1 parent 83b0ce8 commit 393aeef
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 100 deletions.
6 changes: 6 additions & 0 deletions apm-lambda-extension/logsapi/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ func startHTTPServer(transport *LogsTransport) error {
extension.Log.Errorf("Error upon Logs API server start : %v", err)
}
}()

go func() {
<-transport.ctx.Done()
transport.Server.Close()
}()

return nil
}

Expand Down
244 changes: 144 additions & 100 deletions apm-lambda-extension/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"elastic/apm-lambda-extension/logsapi"
"encoding/json"
"fmt"
"github.com/stretchr/testify/suite"
"io/ioutil"
"net"
"net/http"
Expand Down Expand Up @@ -81,9 +80,7 @@ type ApmInfo struct {
Version string `json:"version"`
}

func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest.Server, *MockServerInternals, *MockServerInternals) {

// Mock APM Server
func newMockApmServer(t *testing.T) (*MockServerInternals, *httptest.Server) {
var apmServerInternals MockServerInternals
apmServerInternals.WaitForUnlockSignal = true
apmServerInternals.UnlockSignalChannel = make(chan struct{})
Expand Down Expand Up @@ -135,16 +132,21 @@ func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest.
}
}
}))

if err := os.Setenv("ELASTIC_APM_LAMBDA_APM_SERVER", apmServer.URL); err != nil {
extension.Log.Fatalf("Could not set environment variable : %v", err)
return nil, nil, nil, nil
return nil, nil
}
if err := os.Setenv("ELASTIC_APM_SECRET_TOKEN", "none"); err != nil {
extension.Log.Fatalf("Could not set environment variable : %v", err)
return nil, nil, nil, nil
return nil, nil
}

// Mock Lambda Server
t.Cleanup(func() { apmServer.Close() })
return &apmServerInternals, apmServer
}

func newMockLambdaServer(t *testing.T, eventsChannel chan MockEvent) *MockServerInternals {
var lambdaServerInternals MockServerInternals
lambdaServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.RequestURI {
Expand Down Expand Up @@ -184,7 +186,7 @@ func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest.
strippedLambdaURL := slicedLambdaURL[1]
if err := os.Setenv("AWS_LAMBDA_RUNTIME_API", strippedLambdaURL); err != nil {
extension.Log.Fatalf("Could not set environment variable : %v", err)
return nil, nil, nil, nil
return nil
}
extensionClient = extension.NewClient(os.Getenv("AWS_LAMBDA_RUNTIME_API"))

Expand All @@ -196,10 +198,26 @@ func initMockServers(eventsChannel chan MockEvent) (*httptest.Server, *httptest.
}
if err = os.Setenv("ELASTIC_APM_DATA_RECEIVER_SERVER_PORT", fmt.Sprint(extensionPort)); err != nil {
extension.Log.Fatalf("Could not set environment variable : %v", err)
return nil, nil, nil, nil
return nil
}

t.Cleanup(func() { lambdaServer.Close() })
return &lambdaServerInternals
}

// TODO : Move logger out of extension package and stop using it as a package-level variable
func newLogger(t *testing.T, logLevel string) {
if err := os.Setenv("ELASTIC_APM_LOG_LEVEL", logLevel); err != nil {
t.Fail()
}
}

return lambdaServer, apmServer, &apmServerInternals, &lambdaServerInternals
func newTestStructs(t *testing.T) (context.Context, chan MockEvent) {
http.DefaultServeMux = new(http.ServeMux)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(func() { cancel() })
eventsChannel := make(chan MockEvent, 100)
return ctx, eventsChannel
}

func processMockEvent(currId string, event MockEvent, extensionPort string, internals *MockServerInternals) {
Expand Down Expand Up @@ -319,187 +337,213 @@ func eventQueueGenerator(inputQueue []MockEvent, eventsChannel chan MockEvent) {
}
}

// TESTS
type MainUnitTestsSuite struct {
suite.Suite
eventsChannel chan MockEvent
lambdaServer *httptest.Server
apmServer *httptest.Server
apmServerInternals *MockServerInternals
lambdaServerInternals *MockServerInternals
ctx context.Context
cancel context.CancelFunc
}

func TestMainUnitTestsSuite(t *testing.T) {
suite.Run(t, new(MainUnitTestsSuite))
}

// This function executes before each test case
func (suite *MainUnitTestsSuite) SetupTest() {
if err := os.Setenv("ELASTIC_APM_LOG_LEVEL", "trace"); err != nil {
suite.T().Fail()
}
suite.ctx, suite.cancel = context.WithCancel(context.Background())
http.DefaultServeMux = new(http.ServeMux)
suite.eventsChannel = make(chan MockEvent, 100)
suite.lambdaServer, suite.apmServer, suite.apmServerInternals, suite.lambdaServerInternals = initMockServers(suite.eventsChannel)
}

// This function executes after each test case
func (suite *MainUnitTestsSuite) TearDownTest() {
suite.lambdaServer.Close()
suite.apmServer.Close()
suite.cancel()
}

// TestStandardEventsChain checks a nominal sequence of events (fast APM server, only one standard event)
func (suite *MainUnitTestsSuite) TestStandardEventsChain() {
func TestStandardEventsChain(t *testing.T) {
newLogger(t, "trace")
_, eventsChannel := newTestStructs(t)
apmServerInternals, _ := newMockApmServer(t)
newMockLambdaServer(t, eventsChannel)

eventsChain := []MockEvent{
{Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5},
}
eventQueueGenerator(eventsChain, suite.eventsChannel)
assert.NotPanics(suite.T(), main)
assert.True(suite.T(), strings.Contains(suite.apmServerInternals.Data, string(TimelyResponse)))
eventQueueGenerator(eventsChain, eventsChannel)
assert.NotPanics(t, main)
assert.True(t, strings.Contains(apmServerInternals.Data, string(TimelyResponse)))
}

// TestFlush checks if the flushed param does not cause a panic or an unexpected behavior
func (suite *MainUnitTestsSuite) TestFlush() {
func TestFlush(t *testing.T) {
newLogger(t, "trace")
_, eventsChannel := newTestStructs(t)
apmServerInternals, _ := newMockApmServer(t)
newMockLambdaServer(t, eventsChannel)

eventsChain := []MockEvent{
{Type: InvokeStandardFlush, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5},
}
eventQueueGenerator(eventsChain, suite.eventsChannel)
assert.NotPanics(suite.T(), main)
assert.True(suite.T(), strings.Contains(suite.apmServerInternals.Data, string(TimelyResponse)))
eventQueueGenerator(eventsChain, eventsChannel)
assert.NotPanics(t, main)
assert.True(t, strings.Contains(apmServerInternals.Data, string(TimelyResponse)))
}

// TestWaitGroup checks if there is no race condition between the main waitgroups (issue #128)
func (suite *MainUnitTestsSuite) TestWaitGroup() {
func TestWaitGroup(t *testing.T) {
newLogger(t, "trace")
_, eventsChannel := newTestStructs(t)
apmServerInternals, _ := newMockApmServer(t)
newMockLambdaServer(t, eventsChannel)

eventsChain := []MockEvent{
{Type: InvokeWaitgroupsRace, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 500},
}
eventQueueGenerator(eventsChain, suite.eventsChannel)
assert.NotPanics(suite.T(), main)
assert.True(suite.T(), strings.Contains(suite.apmServerInternals.Data, string(TimelyResponse)))
eventQueueGenerator(eventsChain, eventsChannel)
assert.NotPanics(t, main)
assert.True(t, strings.Contains(apmServerInternals.Data, string(TimelyResponse)))
}

// TestAPMServerDown tests that main does not panic nor runs indefinitely when the APM server is inactive.
func (suite *MainUnitTestsSuite) TestAPMServerDown() {
suite.apmServer.Close()
func TestAPMServerDown(t *testing.T) {
newLogger(t, "trace")
_, eventsChannel := newTestStructs(t)
apmServerInternals, apmServer := newMockApmServer(t)
newMockLambdaServer(t, eventsChannel)

apmServer.Close()
eventsChain := []MockEvent{
{Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5},
}
eventQueueGenerator(eventsChain, suite.eventsChannel)
assert.NotPanics(suite.T(), main)
assert.False(suite.T(), strings.Contains(suite.apmServerInternals.Data, string(TimelyResponse)))
eventQueueGenerator(eventsChain, eventsChannel)
assert.NotPanics(t, main)
assert.False(t, strings.Contains(apmServerInternals.Data, string(TimelyResponse)))
}

// TestAPMServerHangs tests that main does not panic nor runs indefinitely when the APM server does not respond.
func (suite *MainUnitTestsSuite) TestAPMServerHangs() {
func TestAPMServerHangs(t *testing.T) {
newLogger(t, "trace")
_, eventsChannel := newTestStructs(t)
apmServerInternals, _ := newMockApmServer(t)
newMockLambdaServer(t, eventsChannel)

eventsChain := []MockEvent{
{Type: InvokeStandard, APMServerBehavior: Hangs, ExecutionDuration: 1, Timeout: 500},
}
eventQueueGenerator(eventsChain, suite.eventsChannel)
assert.NotPanics(suite.T(), main)
assert.False(suite.T(), strings.Contains(suite.apmServerInternals.Data, string(Hangs)))
suite.apmServerInternals.UnlockSignalChannel <- struct{}{}
eventQueueGenerator(eventsChain, eventsChannel)
assert.NotPanics(t, main)
assert.False(t, strings.Contains(apmServerInternals.Data, string(Hangs)))
apmServerInternals.UnlockSignalChannel <- struct{}{}
}

// TestAPMServerRecovery tests a scenario where the APM server recovers after hanging.
// The default forwarder timeout is 3 seconds, so we wait 5 seconds until we unlock that hanging state.
// Hence, the APM server is waked up just in time to process the TimelyResponse data frame.
func (suite *MainUnitTestsSuite) TestAPMServerRecovery() {
func TestAPMServerRecovery(t *testing.T) {
newLogger(t, "trace")
_, eventsChannel := newTestStructs(t)
apmServerInternals, _ := newMockApmServer(t)
newMockLambdaServer(t, eventsChannel)

if err := os.Setenv("ELASTIC_APM_DATA_FORWARDER_TIMEOUT_SECONDS", "1"); err != nil {
suite.T().Fail()
t.Fail()
}

eventsChain := []MockEvent{
{Type: InvokeStandard, APMServerBehavior: Hangs, ExecutionDuration: 1, Timeout: 5},
{Type: InvokeStandard, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5},
}
eventQueueGenerator(eventsChain, suite.eventsChannel)
eventQueueGenerator(eventsChain, eventsChannel)
go func() {
time.Sleep(2500 * time.Millisecond) // Cannot multiply time.Second by a float
suite.apmServerInternals.UnlockSignalChannel <- struct{}{}
apmServerInternals.UnlockSignalChannel <- struct{}{}
}()
assert.NotPanics(suite.T(), main)
assert.True(suite.T(), strings.Contains(suite.apmServerInternals.Data, string(Hangs)))
assert.True(suite.T(), strings.Contains(suite.apmServerInternals.Data, string(TimelyResponse)))
assert.NotPanics(t, main)
assert.True(t, strings.Contains(apmServerInternals.Data, string(Hangs)))
assert.True(t, strings.Contains(apmServerInternals.Data, string(TimelyResponse)))
if err := os.Setenv("ELASTIC_APM_DATA_FORWARDER_TIMEOUT_SECONDS", ""); err != nil {
suite.T().Fail()
t.Fail()
}
}

// TestGracePeriodHangs verifies that the WaitforGracePeriod goroutine ends when main() ends.
// This can be checked by asserting that apmTransportStatus is pending after the execution of main.
func (suite *MainUnitTestsSuite) TestGracePeriodHangs() {
func TestGracePeriodHangs(t *testing.T) {
newLogger(t, "trace")
_, eventsChannel := newTestStructs(t)
apmServerInternals, _ := newMockApmServer(t)
newMockLambdaServer(t, eventsChannel)

eventsChain := []MockEvent{
{Type: InvokeStandard, APMServerBehavior: Hangs, ExecutionDuration: 1, Timeout: 500},
}
eventQueueGenerator(eventsChain, suite.eventsChannel)
assert.NotPanics(suite.T(), main)
eventQueueGenerator(eventsChain, eventsChannel)
assert.NotPanics(t, main)

time.Sleep(100 * time.Millisecond)
suite.apmServerInternals.UnlockSignalChannel <- struct{}{}
apmServerInternals.UnlockSignalChannel <- struct{}{}
}

// TestAPMServerCrashesDuringExecution tests that main does not panic nor runs indefinitely when the APM server crashes
// during execution.
func (suite *MainUnitTestsSuite) TestAPMServerCrashesDuringExecution() {
func TestAPMServerCrashesDuringExecution(t *testing.T) {
newLogger(t, "trace")
_, eventsChannel := newTestStructs(t)
apmServerInternals, _ := newMockApmServer(t)
newMockLambdaServer(t, eventsChannel)

eventsChain := []MockEvent{
{Type: InvokeStandard, APMServerBehavior: Crashes, ExecutionDuration: 1, Timeout: 5},
}
eventQueueGenerator(eventsChain, suite.eventsChannel)
assert.NotPanics(suite.T(), main)
assert.False(suite.T(), strings.Contains(suite.apmServerInternals.Data, string(Crashes)))
eventQueueGenerator(eventsChain, eventsChannel)
assert.NotPanics(t, main)
assert.False(t, strings.Contains(apmServerInternals.Data, string(Crashes)))
}

// TestFullChannel checks that an overload of APM data chunks is handled correctly, events dropped beyond the 100th one
// if no space left in channel, no panic, no infinite hanging.
func (suite *MainUnitTestsSuite) TestFullChannel() {
func TestFullChannel(t *testing.T) {
newLogger(t, "trace")
_, eventsChannel := newTestStructs(t)
apmServerInternals, _ := newMockApmServer(t)
newMockLambdaServer(t, eventsChannel)

eventsChain := []MockEvent{
{Type: InvokeMultipleTransactionsOverload, APMServerBehavior: TimelyResponse, ExecutionDuration: 0.1, Timeout: 5},
}
eventQueueGenerator(eventsChain, suite.eventsChannel)
assert.NotPanics(suite.T(), main)
assert.True(suite.T(), strings.Contains(suite.apmServerInternals.Data, string(TimelyResponse)))
eventQueueGenerator(eventsChain, eventsChannel)
assert.NotPanics(t, main)
assert.True(t, strings.Contains(apmServerInternals.Data, string(TimelyResponse)))
}

// TestFullChannelSlowAPMServer tests what happens when the APM Data channel is full and the APM server is slow
// (send strategy : background)
func (suite *MainUnitTestsSuite) TestFullChannelSlowAPMServer() {
func TestFullChannelSlowAPMServer(t *testing.T) {
newLogger(t, "trace")
_, eventsChannel := newTestStructs(t)
newMockApmServer(t)
newMockLambdaServer(t, eventsChannel)

if err := os.Setenv("ELASTIC_APM_SEND_STRATEGY", "background"); err != nil {
suite.T().Fail()
t.Fail()
}

eventsChain := []MockEvent{
{Type: InvokeMultipleTransactionsOverload, APMServerBehavior: SlowResponse, ExecutionDuration: 0.01, Timeout: 5},
}
eventQueueGenerator(eventsChain, suite.eventsChannel)
assert.NotPanics(suite.T(), main)
eventQueueGenerator(eventsChain, eventsChannel)
assert.NotPanics(t, main)
// The test should not hang
if err := os.Setenv("ELASTIC_APM_SEND_STRATEGY", "syncflush"); err != nil {
suite.T().Fail()
t.Fail()
}
}

// TestInfoRequest checks if the extension is able to retrieve APM server info (/ endpoint) (fast APM server, only one standard event)
func (suite *MainUnitTestsSuite) TestInfoRequest() {
func TestInfoRequest(t *testing.T) {
newLogger(t, "trace")
_, eventsChannel := newTestStructs(t)
newMockApmServer(t)
lambdaServerInternals := newMockLambdaServer(t, eventsChannel)

eventsChain := []MockEvent{
{Type: InvokeStandardInfo, APMServerBehavior: TimelyResponse, ExecutionDuration: 1, Timeout: 5},
}
eventQueueGenerator(eventsChain, suite.eventsChannel)
assert.NotPanics(suite.T(), main)
assert.True(suite.T(), strings.Contains(suite.lambdaServerInternals.Data, "7814d524d3602e70b703539c57568cba6964fc20"))
eventQueueGenerator(eventsChain, eventsChannel)
assert.NotPanics(t, main)
assert.True(t, strings.Contains(lambdaServerInternals.Data, "7814d524d3602e70b703539c57568cba6964fc20"))
}

// TestInfoRequest checks if the extension times out when unable to retrieve APM server info (/ endpoint)
func (suite *MainUnitTestsSuite) TestInfoRequestHangs() {
func TestInfoRequestHangs(t *testing.T) {
newLogger(t, "trace")
_, eventsChannel := newTestStructs(t)
apmServerInternals, _ := newMockApmServer(t)
lambdaServerInternals := newMockLambdaServer(t, eventsChannel)

eventsChain := []MockEvent{
{Type: InvokeStandardInfo, APMServerBehavior: Hangs, ExecutionDuration: 1, Timeout: 500},
}
eventQueueGenerator(eventsChain, suite.eventsChannel)
assert.NotPanics(suite.T(), main)
assert.False(suite.T(), strings.Contains(suite.lambdaServerInternals.Data, "7814d524d3602e70b703539c57568cba6964fc20"))
suite.apmServerInternals.UnlockSignalChannel <- struct{}{}
eventQueueGenerator(eventsChain, eventsChannel)
assert.NotPanics(t, main)
assert.False(t, strings.Contains(lambdaServerInternals.Data, "7814d524d3602e70b703539c57568cba6964fc20"))
apmServerInternals.UnlockSignalChannel <- struct{}{}
}

0 comments on commit 393aeef

Please sign in to comment.