Skip to content

Commit 2cab1ad

Browse files
kruskallrockdaboot
andauthored
fix: forward logs directly when invocation is over (#613)
* fix: forward logs directly when invocation is over do not send to the lambda data chan and potentially wait forever causing a timeout on shutdown * Update run.go * Apply suggestions from code review Co-authored-by: Tim Rühsen <[email protected]> --------- Co-authored-by: Tim Rühsen <[email protected]>
1 parent 8c3f5b2 commit 2cab1ad

File tree

3 files changed

+26
-20
lines changed

3 files changed

+26
-20
lines changed

apmproxy/apmserver.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (c *Client) ForwardApmData(ctx context.Context) error {
6363
c.logger.Debugf("Received something from '%s' without APMData", data.AgentInfo)
6464
continue
6565
}
66-
if err := c.forwardAgentData(ctx, data); err != nil {
66+
if err := c.ForwardAgentData(ctx, data); err != nil {
6767
return err
6868
}
6969
if lambdaDataChan == nil {
@@ -73,7 +73,7 @@ func (c *Client) ForwardApmData(ctx context.Context) error {
7373
c.logger.Debug("Assigned Lambda data channel")
7474
}
7575
case data := <-lambdaDataChan:
76-
if err := c.forwardLambdaData(ctx, data); err != nil {
76+
if err := c.ForwardLambdaData(ctx, data); err != nil {
7777
return err
7878
}
7979
}
@@ -91,7 +91,7 @@ func (c *Client) FlushAPMData(ctx context.Context) {
9191
// Flush agent data first to make sure metadata is available if possible
9292
for i := len(c.AgentDataChannel); i > 0; i-- {
9393
data := <-c.AgentDataChannel
94-
if err := c.forwardAgentData(ctx, data); err != nil {
94+
if err := c.ForwardAgentData(ctx, data); err != nil {
9595
c.logger.Errorf("Error sending to APM Server, skipping: %v", err)
9696
}
9797
}
@@ -107,7 +107,7 @@ func (c *Client) FlushAPMData(ctx context.Context) {
107107
for {
108108
select {
109109
case apmData := <-c.LambdaDataChannel:
110-
if err := c.forwardLambdaData(ctx, apmData); err != nil {
110+
if err := c.ForwardLambdaData(ctx, apmData); err != nil {
111111
c.logger.Errorf("Error sending to APM server, skipping: %v", err)
112112
}
113113
case <-ctx.Done():
@@ -349,7 +349,7 @@ func (c *Client) WaitForFlush() <-chan struct{} {
349349
return c.flushCh
350350
}
351351

352-
func (c *Client) forwardAgentData(ctx context.Context, apmData accumulator.APMData) error {
352+
func (c *Client) ForwardAgentData(ctx context.Context, apmData accumulator.APMData) error {
353353
if err := c.batch.AddAgentData(apmData); err != nil {
354354
c.logger.Warnf("Dropping agent data due to error: %v", err)
355355
}
@@ -359,7 +359,7 @@ func (c *Client) forwardAgentData(ctx context.Context, apmData accumulator.APMDa
359359
return nil
360360
}
361361

362-
func (c *Client) forwardLambdaData(ctx context.Context, data []byte) error {
362+
func (c *Client) ForwardLambdaData(ctx context.Context, data []byte) error {
363363
if err := c.batch.AddLambdaData(data); err != nil {
364364
c.logger.Warnf("Dropping lambda data due to error: %v", err)
365365
}

app/run.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func (app *App) Run(ctx context.Context) error {
9898
app.logger.Infof("Exiting due to shutdown event with reason %s", event.ShutdownReason)
9999
if app.logsClient != nil {
100100
// Flush buffered logs if any
101-
app.logsClient.FlushData(ctx, event.RequestID, event.InvokedFunctionArn, app.apmClient.LambdaDataChannel, true)
101+
app.logsClient.FlushData(ctx, event.RequestID, event.InvokedFunctionArn, app.apmClient.ForwardLambdaData, true)
102102
}
103103
// Since we have waited for the processEvent loop to finish we
104104
// already have received all the data we can from the agent. So, we
@@ -131,7 +131,7 @@ func (app *App) Run(ctx context.Context) error {
131131
flushCtx, cancel := context.WithCancel(ctx)
132132
if app.logsClient != nil {
133133
// Flush buffered logs if any
134-
app.logsClient.FlushData(ctx, event.RequestID, event.InvokedFunctionArn, app.apmClient.LambdaDataChannel, false)
134+
app.logsClient.FlushData(ctx, event.RequestID, event.InvokedFunctionArn, app.apmClient.ForwardLambdaData, false)
135135
}
136136
// Flush APM data now that the function invocation has completed
137137
app.apmClient.FlushAPMData(flushCtx)
@@ -213,7 +213,13 @@ func (app *App) processEvent(
213213
invocationCtx,
214214
event.RequestID,
215215
event.InvokedFunctionArn,
216-
app.apmClient.LambdaDataChannel,
216+
func(ctx context.Context, b []byte) error {
217+
select {
218+
case app.apmClient.LambdaDataChannel <- b:
219+
case <-ctx.Done():
220+
}
221+
return nil
222+
},
217223
event.EventType == extension.Shutdown,
218224
)
219225
}()

logsapi/event.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import (
2525
// LogEventType represents the log type that is received in the log messages
2626
type LogEventType string
2727

28+
type Forwarder func(context.Context, []byte) error
29+
2830
const (
2931
// PlatformRuntimeDone event is sent when lambda function is finished it's execution
3032
PlatformRuntimeDone LogEventType = "platform.runtimeDone"
@@ -60,13 +62,13 @@ func (lc *Client) ProcessLogs(
6062
ctx context.Context,
6163
requestID string,
6264
invokedFnArn string,
63-
dataChan chan []byte,
65+
forwardFn Forwarder,
6466
isShutdown bool,
6567
) {
6668
for {
6769
select {
6870
case logEvent := <-lc.logsChannel:
69-
if shouldExit := lc.handleEvent(ctx, logEvent, requestID, invokedFnArn, dataChan, isShutdown); shouldExit {
71+
if shouldExit := lc.handleEvent(ctx, logEvent, requestID, invokedFnArn, forwardFn, isShutdown); shouldExit {
7072
return
7173
}
7274
case <-ctx.Done():
@@ -80,14 +82,14 @@ func (lc *Client) FlushData(
8082
ctx context.Context,
8183
requestID string,
8284
invokedFnArn string,
83-
dataChan chan []byte,
85+
forwardFn Forwarder,
8486
isShutdown bool,
8587
) {
8688
lc.logger.Infof("flushing %d buffered logs", len(lc.logsChannel))
8789
for {
8890
select {
8991
case logEvent := <-lc.logsChannel:
90-
if shouldExit := lc.handleEvent(ctx, logEvent, requestID, invokedFnArn, dataChan, isShutdown); shouldExit {
92+
if shouldExit := lc.handleEvent(ctx, logEvent, requestID, invokedFnArn, forwardFn, isShutdown); shouldExit {
9193
return
9294
}
9395
case <-ctx.Done():
@@ -106,7 +108,7 @@ func (lc *Client) handleEvent(ctx context.Context,
106108
logEvent LogEvent,
107109
requestID string,
108110
invokedFnArn string,
109-
dataChan chan []byte,
111+
forwardFn Forwarder,
110112
isShutdown bool,
111113
) bool {
112114
lc.logger.Debugf("Received log event %v for request ID %s", logEvent.Type, logEvent.Record.RequestID)
@@ -139,9 +141,8 @@ func (lc *Client) handleEvent(ctx context.Context,
139141
if err != nil {
140142
lc.logger.Errorf("Error processing Lambda platform metrics: %v", err)
141143
} else {
142-
select {
143-
case dataChan <- processedMetrics:
144-
case <-ctx.Done():
144+
if err := forwardFn(ctx, processedMetrics); err != nil {
145+
lc.logger.Errorf("Error forwarding Lambda platform metrics: %v", err)
145146
}
146147
}
147148
}
@@ -166,9 +167,8 @@ func (lc *Client) handleEvent(ctx context.Context,
166167
if err != nil {
167168
lc.logger.Warnf("Error processing function log : %v", err)
168169
} else {
169-
select {
170-
case dataChan <- processedLog:
171-
case <-ctx.Done():
170+
if err := forwardFn(ctx, processedLog); err != nil {
171+
lc.logger.Warnf("Error forwarding function log : %v", err)
172172
}
173173
}
174174
}

0 commit comments

Comments
 (0)