Skip to content

Commit 83b0ce8

Browse files
committed
Remove state variables
1 parent 43ca282 commit 83b0ce8

File tree

11 files changed

+126
-115
lines changed

11 files changed

+126
-115
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
RUN_E2E_TESTS=true
1+
RUN_E2E_TESTS=false
22
ELASTIC_APM_LOG_LEVEL=info

apm-lambda-extension/extension/apm_server.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,6 @@ import (
3232
"time"
3333
)
3434

35-
var bufferPool = sync.Pool{New: func() interface{} {
36-
return &bytes.Buffer{}
37-
}}
38-
3935
// Constants for the state of the transport used in
4036
// the backoff implementation.
4137
type ApmServerTransportStatusType string
@@ -49,10 +45,11 @@ const (
4945
// A struct to track the state and status of sending
5046
// to the APM server. Used in the backoff implementation.
5147
type ApmServerTransport struct {
52-
sync.Pool
5348
sync.Mutex
49+
bufferPool sync.Pool
5450
ctx context.Context
5551
config *extensionConfig
52+
AgentDoneSignal chan struct{}
5653
DataChannel chan AgentData
5754
Client *http.Client
5855
Status ApmServerTransportStatusType
@@ -62,6 +59,10 @@ type ApmServerTransport struct {
6259

6360
func InitApmServerTransport(ctx context.Context, config *extensionConfig) *ApmServerTransport {
6461
var transport ApmServerTransport
62+
transport.bufferPool = sync.Pool{New: func() interface{} {
63+
return &bytes.Buffer{}
64+
}}
65+
transport.AgentDoneSignal = make(chan struct{}, 1)
6566
transport.DataChannel = make(chan AgentData, 100)
6667
transport.Client = &http.Client{
6768
Timeout: time.Duration(config.DataForwarderTimeoutSeconds) * time.Second,
@@ -74,7 +75,10 @@ func InitApmServerTransport(ctx context.Context, config *extensionConfig) *ApmSe
7475
return &transport
7576
}
7677

77-
func StartBackgroundSending(transport *ApmServerTransport, funcDone chan struct{}, backgroundDataSendWg *sync.WaitGroup) {
78+
// StartBackgroundApmDataForwarding Receive agent data as it comes in and post it to the APM server.
79+
// Stop checking for, and sending agent data when the function invocation
80+
// has completed, signaled via a channel.
81+
func StartBackgroundApmDataForwarding(transport *ApmServerTransport, funcDone chan struct{}, backgroundDataSendWg *sync.WaitGroup) {
7882
go func() {
7983
defer backgroundDataSendWg.Done()
8084
if transport.Status == Failing {
@@ -115,10 +119,10 @@ func PostToApmServer(transport *ApmServerTransport, agentData AgentData) error {
115119
r = bytes.NewReader(agentData.Data)
116120
} else {
117121
encoding = "gzip"
118-
buf := bufferPool.Get().(*bytes.Buffer)
122+
buf := transport.bufferPool.Get().(*bytes.Buffer)
119123
defer func() {
120124
buf.Reset()
121-
bufferPool.Put(buf)
125+
transport.bufferPool.Put(buf)
122126
}()
123127
gw, err := gzip.NewWriterLevel(buf, gzip.BestSpeed)
124128
if err != nil {

apm-lambda-extension/extension/http_server.go

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,36 +23,34 @@ import (
2323
"time"
2424
)
2525

26-
var agentDataServer *http.Server
27-
2826
// StartHttpServer starts the server listening for APM agent data.
29-
func StartHttpServer(transport *ApmServerTransport) (err error) {
27+
func StartHttpServer(transport *ApmServerTransport) (agentDataServer *http.Server, err error) {
3028
mux := http.NewServeMux()
3129
mux.HandleFunc("/", handleInfoRequest(transport))
32-
mux.HandleFunc("/intake/v2/events", handleIntakeV2Events(transport.DataChannel))
30+
mux.HandleFunc("/intake/v2/events", handleIntakeV2Events(transport))
3331
timeout := time.Duration(transport.config.dataReceiverTimeoutSeconds) * time.Second
34-
agentDataServer = &http.Server{
32+
server := &http.Server{
3533
Addr: transport.config.dataReceiverServerPort,
3634
Handler: mux,
3735
ReadTimeout: timeout,
3836
WriteTimeout: timeout,
3937
MaxHeaderBytes: 1 << 20,
4038
}
4139

42-
ln, err := net.Listen("tcp", agentDataServer.Addr)
40+
ln, err := net.Listen("tcp", server.Addr)
4341
if err != nil {
4442
return
4543
}
4644

4745
go func() {
48-
Log.Infof("Extension listening for apm data on %s", agentDataServer.Addr)
49-
if err = agentDataServer.Serve(ln); err != nil {
46+
Log.Infof("Extension listening for apm data on %s", server.Addr)
47+
if err = server.Serve(ln); err != nil {
5048
if err.Error() == "http: Server closed" {
5149
Log.Debug(err)
5250
} else {
5351
Log.Errorf("Error upon APM data server start : %v", err)
5452
}
5553
}
5654
}()
57-
return nil
55+
return server, nil
5856
}

apm-lambda-extension/extension/http_server_test.go

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ func TestInfoProxy(t *testing.T) {
5959
dataReceiverTimeoutSeconds: 15,
6060
}
6161
transport := InitApmServerTransport(context.Background(), &config)
62-
if err := StartHttpServer(transport); err != nil {
62+
agentDataServer, err := StartHttpServer(transport)
63+
if err != nil {
6364
t.Fail()
6465
return
6566
}
@@ -108,7 +109,8 @@ func TestInfoProxyErrorStatusCode(t *testing.T) {
108109
}
109110
transport := InitApmServerTransport(context.Background(), &config)
110111

111-
if err := StartHttpServer(transport); err != nil {
112+
agentDataServer, err := StartHttpServer(transport)
113+
if err != nil {
112114
t.Fail()
113115
return
114116
}
@@ -152,7 +154,8 @@ func Test_handleInfoRequest(t *testing.T) {
152154
transport := InitApmServerTransport(context.Background(), &config)
153155

154156
// Start extension server
155-
if err := StartHttpServer(transport); err != nil {
157+
agentDataServer, err := StartHttpServer(transport)
158+
if err != nil {
156159
t.Fail()
157160
return
158161
}
@@ -188,10 +191,10 @@ func (errReader) Read(_ []byte) (int, error) {
188191
}
189192

190193
func Test_handleInfoRequestInvalidBody(t *testing.T) {
191-
testChan := make(chan AgentData)
194+
transport := InitApmServerTransport(context.Background(), &extensionConfig{})
192195
mux := http.NewServeMux()
193196
urlPath := "/intake/v2/events"
194-
mux.HandleFunc(urlPath, handleIntakeV2Events(testChan))
197+
mux.HandleFunc(urlPath, handleIntakeV2Events(transport))
195198
req := httptest.NewRequest(http.MethodGet, urlPath, errReader(0))
196199
recorder := httptest.NewRecorder()
197200

@@ -202,8 +205,6 @@ func Test_handleInfoRequestInvalidBody(t *testing.T) {
202205
func Test_handleIntakeV2EventsQueryParam(t *testing.T) {
203206
body := []byte(`{"metadata": {}`)
204207

205-
AgentDoneSignal = make(chan struct{})
206-
207208
// Create apm server and handler
208209
apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
209210
}))
@@ -217,7 +218,8 @@ func Test_handleIntakeV2EventsQueryParam(t *testing.T) {
217218
}
218219
transport := InitApmServerTransport(context.Background(), &config)
219220

220-
if err := StartHttpServer(transport); err != nil {
221+
agentDataServer, err := StartHttpServer(transport)
222+
if err != nil {
221223
t.Fail()
222224
return
223225
}
@@ -245,7 +247,7 @@ func Test_handleIntakeV2EventsQueryParam(t *testing.T) {
245247
defer timer.Stop()
246248

247249
select {
248-
case <-AgentDoneSignal:
250+
case <-transport.AgentDoneSignal:
249251
<-transport.DataChannel
250252
case <-timer.C:
251253
t.Log("Timed out waiting for server to send FuncDone signal")
@@ -269,7 +271,8 @@ func Test_handleIntakeV2EventsNoQueryParam(t *testing.T) {
269271
}
270272
transport := InitApmServerTransport(context.Background(), &config)
271273

272-
if err := StartHttpServer(transport); err != nil {
274+
agentDataServer, err := StartHttpServer(transport)
275+
if err != nil {
273276
t.Fail()
274277
return
275278
}
@@ -298,8 +301,6 @@ func Test_handleIntakeV2EventsNoQueryParam(t *testing.T) {
298301
func Test_handleIntakeV2EventsQueryParamEmptyData(t *testing.T) {
299302
body := []byte(``)
300303

301-
AgentDoneSignal = make(chan struct{})
302-
303304
// Create apm server and handler
304305
apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
305306
}))
@@ -313,7 +314,8 @@ func Test_handleIntakeV2EventsQueryParamEmptyData(t *testing.T) {
313314
}
314315
transport := InitApmServerTransport(context.Background(), &config)
315316

316-
if err := StartHttpServer(transport); err != nil {
317+
agentDataServer, err := StartHttpServer(transport)
318+
if err != nil {
317319
t.Fail()
318320
return
319321
}
@@ -341,7 +343,7 @@ func Test_handleIntakeV2EventsQueryParamEmptyData(t *testing.T) {
341343
defer timer.Stop()
342344

343345
select {
344-
case <-AgentDoneSignal:
346+
case <-transport.AgentDoneSignal:
345347
case <-timer.C:
346348
t.Log("Timed out waiting for server to send FuncDone signal")
347349
t.Fail()

apm-lambda-extension/extension/process_events.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@ package extension
1919

2020
import (
2121
"encoding/json"
22+
"net/http"
2223
)
2324

2425
// ProcessShutdown processes the Shutdown event received from the
2526
// Lambda runtime API.
26-
func ProcessShutdown() {
27+
func ProcessShutdown(agentDataServer *http.Server, logsServer *http.Server) {
2728
Log.Info("Received SHUTDOWN event, exiting")
2829
agentDataServer.Close()
30+
logsServer.Close()
2931
}
3032

3133
// FlushAPMData reads all the apm data in the apm data channel and sends it to the APM server.

apm-lambda-extension/extension/route_handlers.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ type AgentData struct {
3030
ContentEncoding string
3131
}
3232

33-
var AgentDoneSignal chan struct{}
3433
var currApmServerTransport *ApmServerTransport
3534

3635
// URL: http://server/
@@ -77,7 +76,7 @@ func reverseProxyErrorHandler(res http.ResponseWriter, req *http.Request, err er
7776
}
7877

7978
// URL: http://server/intake/v2/events
80-
func handleIntakeV2Events(agentDataChan chan AgentData) func(w http.ResponseWriter, r *http.Request) {
79+
func handleIntakeV2Events(transport *ApmServerTransport) func(w http.ResponseWriter, r *http.Request) {
8180
return func(w http.ResponseWriter, r *http.Request) {
8281

8382
Log.Debug("Handling APM Data Intake")
@@ -95,11 +94,11 @@ func handleIntakeV2Events(agentDataChan chan AgentData) func(w http.ResponseWrit
9594
ContentEncoding: r.Header.Get("Content-Encoding"),
9695
}
9796

98-
EnqueueAPMData(agentDataChan, agentData)
97+
EnqueueAPMData(transport.DataChannel, agentData)
9998
}
10099

101100
if len(r.URL.Query()["flushed"]) > 0 && r.URL.Query()["flushed"][0] == "true" {
102-
AgentDoneSignal <- struct{}{}
101+
transport.AgentDoneSignal <- struct{}{}
103102
}
104103

105104
w.WriteHeader(http.StatusAccepted)

apm-lambda-extension/logsapi/route_handlers.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
"time"
2525
)
2626

27-
func handleLogEventsRequest(out chan LogEvent) func(w http.ResponseWriter, r *http.Request) {
27+
func handleLogEventsRequest(transport *LogsTransport) func(w http.ResponseWriter, r *http.Request) {
2828

2929
return func(w http.ResponseWriter, r *http.Request) {
3030
var logEvents []LogEvent
@@ -40,7 +40,7 @@ func handleLogEventsRequest(out chan LogEvent) func(w http.ResponseWriter, r *ht
4040
w.WriteHeader(http.StatusInternalServerError)
4141
continue
4242
}
43-
out <- logEvents[idx]
43+
transport.LogsChannel <- logEvents[idx]
4444
}
4545
}
4646
}

0 commit comments

Comments
 (0)