Skip to content

Commit e959d1e

Browse files
authored
fix: ensure buffered logs are flushed (#509)
the logs buffered channel was never flushing its content. Add a FlushData method to flush logs channel and abstract body for readibility.
1 parent bda7ff5 commit e959d1e

File tree

4 files changed

+124
-71
lines changed

4 files changed

+124
-71
lines changed

accumulator/batch.go

+14-5
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,12 @@ type Batch struct {
6868
buf bytes.Buffer
6969
// invocations holds the data for a specific invocation with
7070
// request ID as the key.
71-
invocations map[string]*Invocation
72-
count int
73-
age time.Time
74-
maxSize int
75-
maxAge time.Duration
71+
invocations map[string]*Invocation
72+
count int
73+
age time.Time
74+
maxSize int
75+
maxAge time.Duration
76+
platformStartRequestID string
7677
// currentlyExecutingRequestID represents the request ID of the currently
7778
// executing lambda invocation. The ID can be set either on agent init or
7879
// when extension receives the invoke event. If the agent hooks into the
@@ -223,6 +224,14 @@ func (b *Batch) OnLambdaLogRuntimeDone(reqID, status string, time time.Time) err
223224
return b.finalizeInvocation(reqID, status, time)
224225
}
225226

227+
func (b *Batch) OnPlatformStart(reqID string) {
228+
b.platformStartRequestID = reqID
229+
}
230+
231+
func (b *Batch) PlatformStartReqID() string {
232+
return b.platformStartRequestID
233+
}
234+
226235
// OnPlatformReport should be the last event for a request ID. On receiving the
227236
// platform.report event the batch will cleanup any datastructure for the request
228237
// ID. It will return some of the function metadata to allow the caller to enrich

app/run.go

+4
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ func (app *App) Run(ctx context.Context) error {
9696
backgroundDataSendWg.Wait()
9797
if event.EventType == extension.Shutdown {
9898
app.logger.Infof("Exiting due to shutdown event with reason %s", event.ShutdownReason)
99+
// Flush buffered logs if any
100+
app.logsClient.FlushData(ctx, event.RequestID, event.InvokedFunctionArn, app.apmClient.LambdaDataChannel, true)
99101
// Since we have waited for the processEvent loop to finish we
100102
// already have received all the data we can from the agent. So, we
101103
// flush all the data to make sure that shutdown can correctly deduce
@@ -125,6 +127,8 @@ func (app *App) Run(ctx context.Context) error {
125127
// that the underlying transport is reset for next invocation without
126128
// waiting for grace period if it got to unhealthy state.
127129
flushCtx, cancel := context.WithCancel(ctx)
130+
// Flush buffered logs if any
131+
app.logsClient.FlushData(ctx, event.RequestID, event.InvokedFunctionArn, app.apmClient.LambdaDataChannel, false)
128132
// Flush APM data now that the function invocation has completed
129133
app.apmClient.FlushAPMData(flushCtx)
130134
cancel()

logsapi/client.go

+5
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,12 @@ type ClientOption func(*Client)
4848

4949
type invocationLifecycler interface {
5050
OnLambdaLogRuntimeDone(requestID, status string, time time.Time) error
51+
OnPlatformStart(reqID string)
5152
OnPlatformReport(reqID string) (fnARN string, deadlineMs int64, ts time.Time, err error)
53+
// PlatformStartReqID is to identify the requestID for the function
54+
// logs under the assumption that function logs for a specific request
55+
// ID will be bounded by PlatformStart and PlatformEnd events.
56+
PlatformStartReqID() string
5257
// Size should return the number of invocations waiting on platform.report
5358
Size() int
5459
}

logsapi/event.go

+101-66
Original file line numberDiff line numberDiff line change
@@ -63,79 +63,114 @@ func (lc *Client) ProcessLogs(
6363
dataChan chan []byte,
6464
isShutdown bool,
6565
) {
66-
// platformStartReqID is to identify the requestID for the function
67-
// logs under the assumption that function logs for a specific request
68-
// ID will be bounded by PlatformStart and PlatformEnd events.
69-
var platformStartReqID string
7066
for {
7167
select {
7268
case logEvent := <-lc.logsChannel:
73-
lc.logger.Debugf("Received log event %v for request ID %s", logEvent.Type, logEvent.Record.RequestID)
74-
switch logEvent.Type {
75-
case PlatformStart:
76-
platformStartReqID = logEvent.Record.RequestID
77-
case PlatformRuntimeDone:
78-
if err := lc.invocationLifecycler.OnLambdaLogRuntimeDone(
79-
logEvent.Record.RequestID,
80-
logEvent.Record.Status,
81-
logEvent.Time,
82-
); err != nil {
83-
lc.logger.Warnf("Failed to finalize invocation with request ID %s: %v", logEvent.Record.RequestID, err)
84-
}
85-
// For invocation events the platform.runtimeDone would be the last possible event.
86-
if !isShutdown && logEvent.Record.RequestID == requestID {
87-
lc.logger.Debugf(
88-
"Processed runtime done event for reqID %s as the last log event for the invocation",
89-
logEvent.Record.RequestID,
90-
)
91-
return
92-
}
93-
case PlatformReport:
94-
fnARN, deadlineMs, ts, err := lc.invocationLifecycler.OnPlatformReport(logEvent.Record.RequestID)
95-
if err != nil {
96-
lc.logger.Warnf("Failed to process platform report: %v", err)
97-
} else {
98-
lc.logger.Debugf("Received platform report for %s", logEvent.Record.RequestID)
99-
processedMetrics, err := ProcessPlatformReport(fnARN, deadlineMs, ts, logEvent)
100-
if err != nil {
101-
lc.logger.Errorf("Error processing Lambda platform metrics: %v", err)
102-
} else {
103-
select {
104-
case dataChan <- processedMetrics:
105-
case <-ctx.Done():
106-
}
107-
}
108-
}
109-
// For shutdown event the platform report metrics for the previous log event
110-
// would be the last possible log event. After processing this metric the
111-
// invocation lifecycler's cache should be empty.
112-
if isShutdown && lc.invocationLifecycler.Size() == 0 {
113-
lc.logger.Debugf(
114-
"Processed platform report event for reqID %s as the last log event before shutdown",
115-
logEvent.Record.RequestID,
116-
)
117-
return
118-
}
119-
case PlatformLogsDropped:
120-
lc.logger.Warnf("Logs dropped due to extension falling behind: %v", logEvent.Record)
121-
case FunctionLog:
122-
processedLog, err := ProcessFunctionLog(
123-
platformStartReqID,
124-
invokedFnArn,
125-
logEvent,
126-
)
127-
if err != nil {
128-
lc.logger.Warnf("Error processing function log : %v", err)
129-
} else {
130-
select {
131-
case dataChan <- processedLog:
132-
case <-ctx.Done():
133-
}
134-
}
69+
if shouldExit := lc.handleEvent(logEvent, ctx, requestID, invokedFnArn, dataChan, isShutdown); shouldExit {
70+
return
13571
}
13672
case <-ctx.Done():
13773
lc.logger.Debug("Current invocation over. Interrupting logs processing goroutine")
13874
return
13975
}
14076
}
14177
}
78+
79+
func (lc *Client) FlushData(
80+
ctx context.Context,
81+
requestID string,
82+
invokedFnArn string,
83+
dataChan chan []byte,
84+
isShutdown bool,
85+
) {
86+
lc.logger.Infof("flushing %d buffered logs", len(lc.logsChannel))
87+
for {
88+
select {
89+
case logEvent := <-lc.logsChannel:
90+
if shouldExit := lc.handleEvent(logEvent, ctx, requestID, invokedFnArn, dataChan, isShutdown); shouldExit {
91+
return
92+
}
93+
case <-ctx.Done():
94+
lc.logger.Debug("Current invocation over. Interrupting logs flushing")
95+
return
96+
default:
97+
if len(lc.logsChannel) == 0 {
98+
lc.logger.Debug("Flush ended for logs - no data in buffer")
99+
return
100+
}
101+
}
102+
}
103+
}
104+
105+
func (lc *Client) handleEvent(logEvent LogEvent,
106+
ctx context.Context,
107+
requestID string,
108+
invokedFnArn string,
109+
dataChan chan []byte,
110+
isShutdown bool,
111+
) bool {
112+
lc.logger.Debugf("Received log event %v for request ID %s", logEvent.Type, logEvent.Record.RequestID)
113+
switch logEvent.Type {
114+
case PlatformStart:
115+
lc.invocationLifecycler.OnPlatformStart(logEvent.Record.RequestID)
116+
case PlatformRuntimeDone:
117+
if err := lc.invocationLifecycler.OnLambdaLogRuntimeDone(
118+
logEvent.Record.RequestID,
119+
logEvent.Record.Status,
120+
logEvent.Time,
121+
); err != nil {
122+
lc.logger.Warnf("Failed to finalize invocation with request ID %s: %v", logEvent.Record.RequestID, err)
123+
}
124+
// For invocation events the platform.runtimeDone would be the last possible event.
125+
if !isShutdown && logEvent.Record.RequestID == requestID {
126+
lc.logger.Debugf(
127+
"Processed runtime done event for reqID %s as the last log event for the invocation",
128+
logEvent.Record.RequestID,
129+
)
130+
return true
131+
}
132+
case PlatformReport:
133+
fnARN, deadlineMs, ts, err := lc.invocationLifecycler.OnPlatformReport(logEvent.Record.RequestID)
134+
if err != nil {
135+
lc.logger.Warnf("Failed to process platform report: %v", err)
136+
} else {
137+
lc.logger.Debugf("Received platform report for %s", logEvent.Record.RequestID)
138+
processedMetrics, err := ProcessPlatformReport(fnARN, deadlineMs, ts, logEvent)
139+
if err != nil {
140+
lc.logger.Errorf("Error processing Lambda platform metrics: %v", err)
141+
} else {
142+
select {
143+
case dataChan <- processedMetrics:
144+
case <-ctx.Done():
145+
}
146+
}
147+
}
148+
// For shutdown event the platform report metrics for the previous log event
149+
// would be the last possible log event. After processing this metric the
150+
// invocation lifecycler's cache should be empty.
151+
if isShutdown && lc.invocationLifecycler.Size() == 0 {
152+
lc.logger.Debugf(
153+
"Processed platform report event for reqID %s as the last log event before shutdown",
154+
logEvent.Record.RequestID,
155+
)
156+
return true
157+
}
158+
case PlatformLogsDropped:
159+
lc.logger.Warnf("Logs dropped due to extension falling behind: %v", logEvent.Record)
160+
case FunctionLog:
161+
processedLog, err := ProcessFunctionLog(
162+
lc.invocationLifecycler.PlatformStartReqID(),
163+
invokedFnArn,
164+
logEvent,
165+
)
166+
if err != nil {
167+
lc.logger.Warnf("Error processing function log : %v", err)
168+
} else {
169+
select {
170+
case dataChan <- processedLog:
171+
case <-ctx.Done():
172+
}
173+
}
174+
}
175+
return false
176+
}

0 commit comments

Comments
 (0)