2
2
//
3
3
// This source file is part of the SwiftAWSLambdaRuntime open source project
4
4
//
5
- // Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors
5
+ // Copyright (c) 2017-2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors
6
6
// Licensed under Apache License v2.0
7
7
//
8
8
// See LICENSE.txt for license information
@@ -97,9 +97,16 @@ internal final class HTTPClient {
97
97
private func connect( ) -> EventLoopFuture < Channel > {
98
98
let bootstrap = ClientBootstrap ( group: self . eventLoop)
99
99
. channelInitializer { channel in
100
- channel. pipeline. addHTTPClientHandlers ( ) . flatMap {
101
- channel. pipeline. addHandlers ( [ HTTPHandler ( keepAlive: self . configuration. keepAlive) ,
102
- UnaryHandler ( keepAlive: self . configuration. keepAlive) ] )
100
+ do {
101
+ try channel. pipeline. syncOperations. addHTTPClientHandlers ( )
102
+ // Lambda quotas... An invocation payload is maximal 6MB in size:
103
+ // https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
104
+ try channel. pipeline. syncOperations. addHandler (
105
+ NIOHTTPClientResponseAggregator ( maxContentLength: 6 * 1024 * 1024 ) )
106
+ try channel. pipeline. syncOperations. addHandler ( LambdaChannelHandler ( ) )
107
+ return channel. eventLoop. makeSucceededFuture ( ( ) )
108
+ } catch {
109
+ return channel. eventLoop. makeFailedFuture ( error)
103
110
}
104
111
}
105
112
@@ -131,10 +138,10 @@ internal final class HTTPClient {
131
138
}
132
139
133
140
internal struct Response : Equatable {
134
- public var version : HTTPVersion
135
- public var status : HTTPResponseStatus
136
- public var headers : HTTPHeaders
137
- public var body : ByteBuffer ?
141
+ var version : HTTPVersion
142
+ var status : HTTPResponseStatus
143
+ var headers : HTTPHeaders
144
+ var body : ByteBuffer ?
138
145
}
139
146
140
147
internal enum Errors : Error {
@@ -149,133 +156,77 @@ internal final class HTTPClient {
149
156
}
150
157
}
151
158
152
- private final class HTTPHandler : ChannelDuplexHandler {
153
- typealias OutboundIn = HTTPClient . Request
154
- typealias InboundOut = HTTPClient . Response
155
- typealias InboundIn = HTTPClientResponsePart
159
+ // no need in locks since we validate only one request can run at a time
160
+ private final class LambdaChannelHandler : ChannelDuplexHandler {
161
+ typealias InboundIn = NIOHTTPClientResponseFull
162
+ typealias OutboundIn = HTTPRequestWrapper
156
163
typealias OutboundOut = HTTPClientRequestPart
157
164
158
- private let keepAlive : Bool
159
- private var readState : ReadState = . idle
160
-
161
- init ( keepAlive: Bool ) {
162
- self . keepAlive = keepAlive
163
- }
164
-
165
- func write( context: ChannelHandlerContext , data: NIOAny , promise: EventLoopPromise < Void > ? ) {
166
- let request = unwrapOutboundIn ( data)
167
-
168
- var head = HTTPRequestHead ( version: . init( major: 1 , minor: 1 ) , method: request. method, uri: request. url, headers: request. headers)
169
- head. headers. add ( name: " host " , value: request. targetHost)
170
- switch request. method {
171
- case . POST, . PUT:
172
- head. headers. add ( name: " content-length " , value: String ( request. body? . readableBytes ?? 0 ) )
173
- default :
174
- break
175
- }
176
-
177
- // We don't add a "Connection" header here if we want to keep the connection open,
178
- // HTTP/1.1 defines specifies the following in RFC 2616, Section 8.1.2.1:
179
- //
180
- // An HTTP/1.1 server MAY assume that a HTTP/1.1 client intends to
181
- // maintain a persistent connection unless a Connection header including
182
- // the connection-token "close" was sent in the request. If the server
183
- // chooses to close the connection immediately after sending the
184
- // response, it SHOULD send a Connection header including the
185
- // connection-token close.
186
- //
187
- // See also UnaryHandler.channelRead below.
188
- if !self . keepAlive {
189
- head. headers. add ( name: " connection " , value: " close " )
190
- }
191
-
192
- context. write ( self . wrapOutboundOut ( HTTPClientRequestPart . head ( head) ) ) . flatMap { _ -> EventLoopFuture < Void > in
193
- if let body = request. body {
194
- return context. writeAndFlush ( self . wrapOutboundOut ( HTTPClientRequestPart . body ( . byteBuffer( body) ) ) )
195
- } else {
196
- context. flush ( )
197
- return context. eventLoop. makeSucceededFuture ( ( ) )
198
- }
199
- } . cascade ( to: promise)
200
- }
201
-
202
- func channelRead( context: ChannelHandlerContext , data: NIOAny ) {
203
- let response = unwrapInboundIn ( data)
204
-
205
- switch response {
206
- case . head( let head) :
207
- guard case . idle = self . readState else {
208
- preconditionFailure ( " invalid read state \( self . readState) " )
209
- }
210
- self . readState = . head( head)
211
- case . body( var bodyPart) :
212
- switch self . readState {
213
- case . head( let head) :
214
- self . readState = . body( head, bodyPart)
215
- case . body( let head, var body) :
216
- body. writeBuffer ( & bodyPart)
217
- self . readState = . body( head, body)
218
- default :
219
- preconditionFailure ( " invalid read state \( self . readState) " )
220
- }
221
- case . end:
222
- switch self . readState {
223
- case . head( let head) :
224
- context. fireChannelRead ( wrapInboundOut ( HTTPClient . Response ( version: head. version, status: head. status, headers: head. headers, body: nil ) ) )
225
- self . readState = . idle
226
- case . body( let head, let body) :
227
- context. fireChannelRead ( wrapInboundOut ( HTTPClient . Response ( version: head. version, status: head. status, headers: head. headers, body: body) ) )
228
- self . readState = . idle
229
- default :
230
- preconditionFailure ( " invalid read state \( self . readState) " )
231
- }
232
- }
233
- }
234
-
235
- private enum ReadState {
165
+ enum State {
236
166
case idle
237
- case head ( HTTPResponseHead )
238
- case body ( HTTPResponseHead , ByteBuffer )
167
+ case running ( promise : EventLoopPromise < HTTPClient . Response > , timeout : Scheduled < Void > ? )
168
+ case waitForConnectionClose ( HTTPClient . Response , EventLoopPromise < HTTPClient . Response > )
239
169
}
240
- }
241
-
242
- // no need in locks since we validate only one request can run at a time
243
- private final class UnaryHandler : ChannelDuplexHandler {
244
- typealias OutboundIn = HTTPRequestWrapper
245
- typealias InboundIn = HTTPClient . Response
246
- typealias OutboundOut = HTTPClient . Request
247
-
248
- private let keepAlive : Bool
249
170
250
- private var pending : ( promise : EventLoopPromise < HTTPClient . Response > , timeout : Scheduled < Void > ? ) ?
171
+ private var state : State = . idle
251
172
private var lastError : Error ?
252
173
253
- init ( keepAlive: Bool ) {
254
- self . keepAlive = keepAlive
255
- }
174
+ init ( ) { }
256
175
257
176
func write( context: ChannelHandlerContext , data: NIOAny , promise: EventLoopPromise < Void > ? ) {
258
- guard self . pending == nil else {
177
+ guard case . idle = self . state else {
259
178
preconditionFailure ( " invalid state, outstanding request " )
260
179
}
261
180
let wrapper = unwrapOutboundIn ( data)
181
+
182
+ var head = HTTPRequestHead (
183
+ version: . http1_1,
184
+ method: wrapper. request. method,
185
+ uri: wrapper. request. url,
186
+ headers: wrapper. request. headers
187
+ )
188
+ head. headers. add ( name: " host " , value: wrapper. request. targetHost)
189
+ switch head. method {
190
+ case . POST, . PUT:
191
+ head. headers. add ( name: " content-length " , value: String ( wrapper. request. body? . readableBytes ?? 0 ) )
192
+ default :
193
+ break
194
+ }
195
+
262
196
let timeoutTask = wrapper. request. timeout. map {
263
197
context. eventLoop. scheduleTask ( in: $0) {
264
- if self . pending != nil {
265
- context . pipeline . fireErrorCaught ( HTTPClient . Errors . timeout )
198
+ guard case . running = self . state else {
199
+ preconditionFailure ( " invalid state " )
266
200
}
201
+
202
+ context. pipeline. fireErrorCaught ( HTTPClient . Errors. timeout)
267
203
}
268
204
}
269
- self . pending = ( promise: wrapper. promise, timeout: timeoutTask)
270
- context. writeAndFlush ( wrapOutboundOut ( wrapper. request) , promise: promise)
205
+ self . state = . running( promise: wrapper. promise, timeout: timeoutTask)
206
+
207
+ context. write ( wrapOutboundOut ( . head( head) ) , promise: nil )
208
+ if let body = wrapper. request. body {
209
+ context. write ( wrapOutboundOut ( . body( IOData . byteBuffer ( body) ) ) , promise: nil )
210
+ }
211
+ context. writeAndFlush ( wrapOutboundOut ( . end( nil ) ) , promise: promise)
271
212
}
272
213
273
214
func channelRead( context: ChannelHandlerContext , data: NIOAny ) {
274
- let response = unwrapInboundIn ( data)
275
- guard let pending = self . pending else {
215
+ guard case . running( let promise, let timeout) = self . state else {
276
216
preconditionFailure ( " invalid state, no pending request " )
277
217
}
278
218
219
+ let response = unwrapInboundIn ( data)
220
+
221
+ let httpResponse = HTTPClient . Response (
222
+ version: response. head. version,
223
+ status: response. head. status,
224
+ headers: response. head. headers,
225
+ body: response. body
226
+ )
227
+
228
+ timeout? . cancel ( )
229
+
279
230
// As defined in RFC 7230 Section 6.3:
280
231
// HTTP/1.1 defaults to the use of "persistent connections", allowing
281
232
// multiple requests and responses to be carried over a single
@@ -285,14 +236,31 @@ private final class UnaryHandler: ChannelDuplexHandler {
285
236
//
286
237
// That's why we only assume the connection shall be closed if we receive
287
238
// a "connection = close" header.
288
- let serverCloseConnection = response. headers. first ( name: " connection " ) ? . lowercased ( ) == " close "
289
-
290
- if !self . keepAlive || serverCloseConnection || response. version != . init( major: 1 , minor: 1 ) {
291
- pending. promise. futureResult. whenComplete { _ in
292
- _ = context. channel. close ( )
293
- }
239
+ let serverCloseConnection =
240
+ response. head. headers [ " connection " ] . contains ( where: { $0. lowercased ( ) == " close " } )
241
+
242
+ let closeConnection = serverCloseConnection || response. head. version != . http1_1
243
+
244
+ if closeConnection {
245
+ // If we were succeeding the request promise here directly and closing the connection
246
+ // after succeeding the promise we may run into a race condition:
247
+ //
248
+ // The lambda runtime will ask for the next work item directly after a succeeded post
249
+ // response request. The desire for the next work item might be faster than the attempt
250
+ // to close the connection. This will lead to a situation where we try to the connection
251
+ // but the next request has already been scheduled on the connection that we want to
252
+ // close. For this reason we postpone succeeding the promise until the connection has
253
+ // been closed. This codepath will only be hit in the very, very unlikely event of the
254
+ // Lambda control plane demanding to close connection. (It's more or less only
255
+ // implemented to support http1.1 correctly.) This behavior is ensured with the test
256
+ // `LambdaTest.testNoKeepAliveServer`.
257
+ self . state = . waitForConnectionClose( httpResponse, promise)
258
+ _ = context. channel. close ( )
259
+ return
260
+ } else {
261
+ self . state = . idle
262
+ promise. succeed ( httpResponse)
294
263
}
295
- self . completeWith ( . success( response) )
296
264
}
297
265
298
266
func errorCaught( context: ChannelHandlerContext , error: Error ) {
@@ -303,36 +271,44 @@ private final class UnaryHandler: ChannelDuplexHandler {
303
271
304
272
func channelInactive( context: ChannelHandlerContext ) {
305
273
// fail any pending responses with last error or assume peer disconnected
306
- if self . pending != nil {
307
- let error = self . lastError ?? HTTPClient . Errors. connectionResetByPeer
308
- self . completeWith ( . failure( error) )
309
- }
310
274
context. fireChannelInactive ( )
275
+
276
+ switch self . state {
277
+ case . idle:
278
+ break
279
+ case . running( let promise, let timeout) :
280
+ self . state = . idle
281
+ timeout? . cancel ( )
282
+ promise. fail ( self . lastError ?? HTTPClient . Errors. connectionResetByPeer)
283
+
284
+ case . waitForConnectionClose( let response, let promise) :
285
+ self . state = . idle
286
+ promise. succeed ( response)
287
+ }
311
288
}
312
289
313
290
func triggerUserOutboundEvent( context: ChannelHandlerContext , event: Any , promise: EventLoopPromise < Void > ? ) {
314
291
switch event {
315
292
case is RequestCancelEvent :
316
- if self . pending != nil {
317
- self . completeWith ( . failure( HTTPClient . Errors. cancelled) )
293
+ switch self . state {
294
+ case . idle:
295
+ break
296
+ case . running( let promise, let timeout) :
297
+ self . state = . idle
298
+ timeout? . cancel ( )
299
+ promise. fail ( HTTPClient . Errors. cancelled)
300
+
318
301
// after the cancel error has been send, we want to close the connection so
319
302
// that no more packets can be read on this connection.
320
303
_ = context. channel. close ( )
304
+ case . waitForConnectionClose( _, let promise) :
305
+ self . state = . idle
306
+ promise. fail ( HTTPClient . Errors. cancelled)
321
307
}
322
308
default :
323
309
context. triggerUserOutboundEvent ( event, promise: promise)
324
310
}
325
311
}
326
-
327
- private func completeWith( _ result: Result < HTTPClient . Response , Error > ) {
328
- guard let pending = self . pending else {
329
- preconditionFailure ( " invalid state, no pending request " )
330
- }
331
- self . pending = nil
332
- self . lastError = nil
333
- pending. timeout? . cancel ( )
334
- pending. promise. completeWith ( result)
335
- }
336
312
}
337
313
338
314
private struct HTTPRequestWrapper {
0 commit comments