@@ -29,24 +29,26 @@ import (
29
29
30
30
const (
31
31
requestSizeLimit = 67108864
32
+ functionTTL = 5e+9 // Funtions deadline, 5 seconds
32
33
)
33
34
34
35
type message struct {
35
- id string
36
- data []byte
36
+ id string
37
+ deadline int64
38
+ data []byte
37
39
}
38
40
39
41
var (
40
42
tasks chan message
41
- results chan message
43
+ results map [ string ] chan message
42
44
43
45
awsEndpoint = "/2018-06-01/runtime"
44
46
environment = map [string ]string {
45
47
"PATH" : "/usr/local/bin:/usr/bin/:/bin:/opt/bin" ,
46
48
"LD_LIBRARY_PATH" : "/lib64:/usr/lib64:$LAMBDA_RUNTIME_DIR:$LAMBDA_RUNTIME_DIR/lib:$LAMBDA_TASK_ROOT:$LAMBDA_TASK_ROOT/lib:/opt/lib" ,
47
49
"AWS_LAMBDA_RUNTIME_API" : "127.0.0.1" ,
48
50
49
- // Some dummy values required by Rust client
51
+ // Some dummy values
50
52
"AWS_LAMBDA_FUNCTION_NAME" : "foo" ,
51
53
"AWS_LAMBDA_FUNCTION_MEMORY_SIZE" : "128" ,
52
54
"AWS_LAMBDA_FUNCTION_VERSION" : "0.0.1" ,
@@ -75,17 +77,31 @@ func newTask(w http.ResponseWriter, r *http.Request) {
75
77
}
76
78
defer r .Body .Close ()
77
79
78
- id := strconv .Itoa (int (time .Now ().UnixNano ()))
79
- fmt .Printf ("<- %s %s\n " , id , body )
80
- tasks <- message {
81
- id : id ,
82
- data : body ,
80
+ now := time .Now ().UnixNano ()
81
+ task := message {
82
+ id : fmt .Sprintf ("%d" , now ),
83
+ deadline : now + functionTTL ,
84
+ data : body ,
85
+ }
86
+ fmt .Printf ("<- %s %s\n " , task .id , task .data )
87
+
88
+ results [task .id ] = make (chan message )
89
+ defer close (results [task .id ])
90
+
91
+ tasks <- task
92
+
93
+ select {
94
+ case <- time .After (time .Duration (functionTTL )):
95
+ fmt .Printf ("-> ! %s Deadline is reached\n " , task .id )
96
+ w .WriteHeader (http .StatusRequestTimeout )
97
+ w .Write ([]byte ("Function deadline is reached" ))
98
+ case result := <- results [task .id ]:
99
+ fmt .Printf ("Response in queue %s\n " , result .id )
100
+ fmt .Printf ("-> %s %s\n " , result .id , result .data )
101
+ w .WriteHeader (http .StatusOK )
102
+ w .Write (result .data )
83
103
}
84
104
85
- response := <- results
86
- fmt .Printf ("-> %s %s\n " , response .id , response .data )
87
- w .WriteHeader (http .StatusOK )
88
- w .Write (response .data )
89
105
return
90
106
}
91
107
@@ -94,7 +110,7 @@ func getTask(w http.ResponseWriter, r *http.Request) {
94
110
95
111
// Dummy headers required by Rust client. Replace with something meaningful
96
112
w .Header ().Set ("Lambda-Runtime-Aws-Request-Id" , task .id )
97
- w .Header ().Set ("Lambda-Runtime-Deadline-Ms" , "5543843233064023422" )
113
+ w .Header ().Set ("Lambda-Runtime-Deadline-Ms" , strconv . Itoa ( int ( task . deadline )) )
98
114
w .Header ().Set ("Lambda-Runtime-Invoked-Function-Arn" , "arn:aws:lambda:us-east-1:123456789012:function:custom-runtime" )
99
115
w .Header ().Set ("Lambda-Runtime-Trace-Id" , "0" )
100
116
@@ -123,7 +139,7 @@ func postResult(w http.ResponseWriter, r *http.Request) {
123
139
}
124
140
defer r .Body .Close ()
125
141
126
- results <- message {
142
+ results [ id ] <- message {
127
143
id : id ,
128
144
data : data ,
129
145
}
@@ -139,8 +155,7 @@ func taskError(w http.ResponseWriter, r *http.Request) {
139
155
return
140
156
}
141
157
142
- fmt .Printf ("! %s %s\n " , id , data )
143
- results <- message {
158
+ results [id ] <- message {
144
159
id : id ,
145
160
data : data ,
146
161
}
@@ -166,11 +181,11 @@ func (h *maxBytesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
166
181
r .Body = http .MaxBytesReader (w , r .Body , h .n )
167
182
h .h .ServeHTTP (w , r )
168
183
}
184
+
169
185
func main () {
170
- tasks = make (chan message )
171
- results = make (chan message )
186
+ tasks = make (chan message , 100 )
187
+ results = make (map [ string ] chan message )
172
188
defer close (tasks )
173
- defer close (results )
174
189
175
190
fmt .Println ("Setup env" )
176
191
if err := setupEnv (); err != nil {
0 commit comments