@@ -19,7 +19,6 @@ package main
19
19
import (
20
20
"fmt"
21
21
"io/ioutil"
22
- "log"
23
22
"net/http"
24
23
"os"
25
24
"os/exec"
@@ -28,9 +27,12 @@ import (
28
27
"sync"
29
28
"time"
30
29
30
+ "github.com/google/uuid"
31
31
"github.com/kelseyhightower/envconfig"
32
+ "go.uber.org/zap"
32
33
33
34
"github.com/triggermesh/aws-custom-runtime/pkg/converter"
35
+ "github.com/triggermesh/aws-custom-runtime/pkg/logger"
34
36
"github.com/triggermesh/aws-custom-runtime/pkg/metrics"
35
37
"github.com/triggermesh/aws-custom-runtime/pkg/sender"
36
38
)
@@ -62,7 +64,7 @@ type Specification struct {
62
64
// Request body size limit, Mb
63
65
RequestSizeLimit int64 `envconfig:"request_size_limit" default:"5"`
64
66
// Funtions deadline, seconds
65
- FunctionTTL int64 `envconfig:"function_ttl" default:"10 "`
67
+ FunctionTTL time. Duration `envconfig:"function_ttl" default:"10s "`
66
68
// Lambda runtime API port for functions
67
69
InternalAPIport string `envconfig:"internal_api_port" default:"80"`
68
70
// Lambda API port to put function requests and get results
@@ -77,14 +79,15 @@ type Handler struct {
77
79
sender * sender.Sender
78
80
converter converter.Converter
79
81
reporter * metrics.EventProcessingStatsReporter
82
+ logger * zap.SugaredLogger
80
83
81
84
requestSizeLimit int64
82
- functionTTL int64
85
+ functionTTL time. Duration
83
86
}
84
87
85
88
type message struct {
86
89
id string
87
- deadline int64
90
+ deadline time. Time
88
91
data []byte
89
92
context map [string ]string
90
93
statusCode int
@@ -114,6 +117,7 @@ func (h *Handler) serve(w http.ResponseWriter, r *http.Request) {
114
117
body , err := ioutil .ReadAll (http .MaxBytesReader (w , r .Body , requestSizeLimitInBytes ))
115
118
if err != nil {
116
119
h .reporter .ReportProcessingError (false , eventTypeTag , eventSrcTag )
120
+ h .logger .Error ("Request exceeds allowed size limit, rejecting" )
117
121
http .Error (w , err .Error (), http .StatusRequestEntityTooLarge )
118
122
return
119
123
}
@@ -122,34 +126,37 @@ func (h *Handler) serve(w http.ResponseWriter, r *http.Request) {
122
126
req , context , err := h .converter .Request (body , r .Header )
123
127
if err != nil {
124
128
h .reporter .ReportProcessingError (false , eventTypeTag , eventSrcTag )
129
+ h .logger .Errorf ("Cannot convert request: %v" , err )
125
130
http .Error (w , err .Error (), http .StatusInternalServerError )
126
131
return
127
132
}
128
133
129
134
eventTypeTag , eventSrcTag = metrics .CETagsFromContext (context )
130
135
131
- result := enqueue (req , context , h .functionTTL * 1e+9 )
136
+ h .logger .Debugf ("Enqueuing request: %+v, %s" , context , string (req ))
137
+ result := enqueue (req , context , h .functionTTL )
138
+ h .logger .Debugf ("Result: %+v, %s" , result .context , string (result .data ))
139
+
132
140
result .data , err = h .converter .Response (result .data )
133
141
if err != nil {
134
142
result .data = []byte (fmt .Sprintf ("Response conversion error: %v" , err ))
143
+ h .logger .Errorf ("Cannot convert response: %v" , err )
135
144
}
136
145
if err := h .sender .Send (result .data , result .statusCode , w ); err != nil {
137
146
h .reporter .ReportProcessingError (false , eventTypeTag , eventSrcTag )
138
- log . Printf ( "! %s %s %v\n " , result . id , result . data , err )
147
+ h . logger . Errorf ( "Cannot send response: %v" , err )
139
148
return
140
149
}
141
150
h .reporter .ReportProcessingSuccess (eventTypeTag , eventSrcTag )
142
151
}
143
152
144
- func enqueue (request []byte , context map [string ]string , ttl int64 ) message {
145
- now := time .Now ().UnixNano ()
153
+ func enqueue (request []byte , context map [string ]string , ttl time.Duration ) message {
146
154
task := message {
147
- id : fmt . Sprintf ( "%d" , now ),
148
- deadline : now + ttl ,
155
+ id : uuid . New (). String ( ),
156
+ deadline : time . Now (). Add ( ttl ) ,
149
157
data : request ,
150
158
context : context ,
151
159
}
152
- log .Printf ("<- %s\n " , task .id )
153
160
154
161
resultsChannel := make (chan message )
155
162
mutex .Lock ()
@@ -161,7 +168,7 @@ func enqueue(request []byte, context map[string]string, ttl int64) message {
161
168
162
169
var resp message
163
170
select {
164
- case <- time .After (time . Duration ( ttl ) ):
171
+ case <- time .After (ttl ):
165
172
resp = message {
166
173
id : task .id ,
167
174
data : []byte (fmt .Sprintf ("Deadline is reached, data %s" , task .data )),
@@ -173,7 +180,6 @@ func enqueue(request []byte, context map[string]string, ttl int64) message {
173
180
mutex .Lock ()
174
181
delete (results , task .id )
175
182
mutex .Unlock ()
176
- log .Printf ("-> %s %d\n " , resp .id , resp .statusCode )
177
183
return resp
178
184
}
179
185
@@ -182,7 +188,7 @@ func getTask(w http.ResponseWriter, r *http.Request) {
182
188
183
189
// Dummy headers required by Rust client. Replace with something meaningful
184
190
w .Header ().Set ("Lambda-Runtime-Aws-Request-Id" , task .id )
185
- w .Header ().Set ("Lambda-Runtime-Deadline-Ms" , strconv .Itoa (int (task .deadline )))
191
+ w .Header ().Set ("Lambda-Runtime-Deadline-Ms" , strconv .Itoa (int (task .deadline . UnixMilli () )))
186
192
w .Header ().Set ("Lambda-Runtime-Invoked-Function-Arn" , "arn:aws:lambda:us-east-1:123456789012:function:custom-runtime" )
187
193
w .Header ().Set ("Lambda-Runtime-Trace-Id" , "0" )
188
194
for k , v := range task .context {
@@ -193,36 +199,39 @@ func getTask(w http.ResponseWriter, r *http.Request) {
193
199
w .Write (task .data )
194
200
}
195
201
196
- func initError (w http.ResponseWriter , r * http.Request ) {
202
+ func ( h * Handler ) initError (w http.ResponseWriter , r * http.Request ) {
197
203
data , err := ioutil .ReadAll (r .Body )
198
204
if err != nil {
199
- log . Fatalln ( err )
205
+ h . logger . Fatalf ( "Cannot read initialization error data: %v" , err )
200
206
}
201
207
defer r .Body .Close ()
202
208
203
- log . Fatalf ("Runtime initialization error: %s\n " , data )
209
+ h . logger . Fatalf ("Runtime initialization error: %s" , data )
204
210
}
205
211
206
212
func parsePath (query string ) (string , string , error ) {
207
213
path := strings .TrimPrefix (query , awsEndpoint + "/invocation/" )
208
214
request := strings .Split (path , "/" )
209
215
if len (request ) != 2 {
210
- return "" , "" , fmt .Errorf ("incorrect URL query size " )
216
+ return "" , "" , fmt .Errorf ("incorrect URL path " )
211
217
}
212
218
return request [0 ], request [1 ], nil
213
219
}
214
220
215
- func responseHandler (w http.ResponseWriter , r * http.Request ) {
221
+ func ( h * Handler ) responseHandler (w http.ResponseWriter , r * http.Request ) {
216
222
id , kind , err := parsePath (r .URL .Path )
217
223
if err != nil {
224
+ h .logger .Errorf ("Runtime response error: %v" , err )
218
225
w .WriteHeader (http .StatusBadRequest )
219
226
w .Write ([]byte (err .Error ()))
220
227
return
221
228
}
222
229
223
230
data , err := ioutil .ReadAll (r .Body )
224
231
if err != nil {
225
- log .Printf ("! %s\n " , err )
232
+ h .logger .Errorf ("Cannot read response data: %v" , err )
233
+ w .WriteHeader (http .StatusBadRequest )
234
+ w .Write ([]byte (err .Error ()))
226
235
return
227
236
}
228
237
defer r .Body .Close ()
@@ -260,16 +269,16 @@ func ping(w http.ResponseWriter, r *http.Request) {
260
269
w .Write ([]byte ("pong" ))
261
270
}
262
271
263
- func api () error {
272
+ func ( h * Handler ) internalAPI () error {
264
273
internalSocket , _ := os .LookupEnv ("AWS_LAMBDA_RUNTIME_API" )
265
274
if internalSocket == "" {
266
275
return fmt .Errorf ("AWS_LAMBDA_RUNTIME_API is not set" )
267
276
}
268
277
269
278
apiRouter := http .NewServeMux ()
270
- apiRouter .HandleFunc (awsEndpoint + "/init/error" , initError )
279
+ apiRouter .HandleFunc (awsEndpoint + "/init/error" , h . initError )
271
280
apiRouter .HandleFunc (awsEndpoint + "/invocation/next" , getTask )
272
- apiRouter .HandleFunc (awsEndpoint + "/invocation/" , responseHandler )
281
+ apiRouter .HandleFunc (awsEndpoint + "/invocation/" , h . responseHandler )
273
282
apiRouter .HandleFunc ("/2018-06-01/ping" , ping )
274
283
275
284
err := http .ListenAndServe (internalSocket , apiRouter )
@@ -280,35 +289,37 @@ func api() error {
280
289
}
281
290
282
291
func main () {
292
+ logger := logger .New ()
293
+
283
294
// parse env
284
295
var spec Specification
285
296
if err := envconfig .Process ("" , & spec ); err != nil {
286
- log .Fatalf ("Cannot process env variables: %v" , err )
297
+ logger .Fatalf ("Cannot process env variables: %v" , err )
287
298
}
288
- log .Printf ("%+v\n " , spec )
289
-
290
- log .Println ("Setting up runtime env" )
299
+ logger .Debugf ("Runtime specification: %+v" , spec )
300
+ logger .Debug ("Setting up runtime env" )
291
301
if err := setupEnv (spec .InternalAPIport ); err != nil {
292
- log .Fatalf ("Cannot setup runime env: %v" , err )
302
+ logger .Fatalf ("Cannot setup runime env: %v" , err )
293
303
}
294
304
295
305
// create converter
296
306
conv , err := converter .New (spec .ResponseFormat )
297
307
if err != nil {
298
- log .Fatalf ("Cannot create converter: %v" , err )
308
+ logger .Fatalf ("Cannot create converter: %v" , err )
299
309
}
300
310
301
311
// start metrics reporter
302
312
mr , err := metrics .StatsExporter ()
303
313
if err != nil {
304
- log .Fatalf ("Cannot start stats exporter: %v" , err )
314
+ logger .Fatalf ("Cannot start stats exporter: %v" , err )
305
315
}
306
316
307
317
// setup sender
308
318
handler := Handler {
309
319
sender : sender .New (spec .Sink , conv .ContentType ()),
310
320
converter : conv ,
311
321
reporter : mr ,
322
+ logger : logger ,
312
323
requestSizeLimit : spec .RequestSizeLimit ,
313
324
functionTTL : spec .FunctionTTL ,
314
325
}
@@ -319,33 +330,33 @@ func main() {
319
330
defer close (tasks )
320
331
321
332
// start Lambda API
322
- log . Println ("Starting API" )
333
+ logger . Debug ("Starting API" )
323
334
go func () {
324
- if err := api (); err != nil {
325
- log .Fatalf ("Runtime internal API error: %v" , err )
335
+ if err := handler . internalAPI (); err != nil {
336
+ logger .Fatalf ("Runtime internal API error: %v" , err )
326
337
}
327
338
}()
328
339
329
340
// start invokers
330
341
for i := 0 ; i < spec .NumberOfinvokers ; i ++ {
331
- log . Println ("Starting bootstrap" , i + 1 )
342
+ logger . Debug ("Starting bootstrap" , i + 1 )
332
343
go func (i int ) {
333
344
cmd := exec .Command ("sh" , "-c" , environment ["LAMBDA_TASK_ROOT" ]+ "/bootstrap" )
334
345
cmd .Env = append (os .Environ (), fmt .Sprintf ("BOOTSTRAP_INDEX=%d" , i ))
335
346
cmd .Stdout = os .Stdout
336
347
cmd .Stderr = os .Stderr
337
348
if err := cmd .Run (); err != nil {
338
- log .Fatalf ("Cannot start bootstrap process: %v" , err )
349
+ logger .Fatalf ("Cannot start bootstrap process: %v" , err )
339
350
}
340
351
}(i )
341
352
}
342
353
343
354
// start external API
344
355
taskRouter := http .NewServeMux ()
345
356
taskRouter .Handle ("/" , http .HandlerFunc (handler .serve ))
346
- log . Println ( "Listening... " )
357
+ logger . Info ( "Runtime initialized " )
347
358
err = http .ListenAndServe (":" + spec .ExternalAPIport , taskRouter )
348
359
if err != nil && err != http .ErrServerClosed {
349
- log .Fatalf ("Runtime external API error: %v" , err )
360
+ logger .Fatalf ("Runtime external API error: %v" , err )
350
361
}
351
362
}
0 commit comments