Skip to content

Commit ef1f43e

Browse files
committed
Consistently close underlying connection when response stream closes
1 parent 1fbe922 commit ef1f43e

File tree

3 files changed

+448
-165
lines changed

3 files changed

+448
-165
lines changed

src/Io/ClientRequestStream.php

+44-40
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* @event response
1717
* @event drain
1818
* @event error
19-
* @event end
19+
* @event close
2020
* @internal
2121
*/
2222
class ClientRequestStream extends EventEmitter implements WritableStreamInterface
@@ -33,9 +33,11 @@ class ClientRequestStream extends EventEmitter implements WritableStreamInterfac
3333
private $request;
3434

3535
/** @var ?ConnectionInterface */
36-
private $stream;
36+
private $connection;
37+
38+
/** @var string */
39+
private $buffer = '';
3740

38-
private $buffer;
3941
private $responseFactory;
4042
private $state = self::STATE_INIT;
4143
private $ended = false;
@@ -58,22 +60,22 @@ private function writeHead()
5860
$this->state = self::STATE_WRITING_HEAD;
5961

6062
$request = $this->request;
61-
$streamRef = &$this->stream;
63+
$connectionRef = &$this->connection;
6264
$stateRef = &$this->state;
6365
$pendingWrites = &$this->pendingWrites;
6466
$that = $this;
6567

6668
$promise = $this->connect();
6769
$promise->then(
68-
function (ConnectionInterface $stream) use ($request, &$streamRef, &$stateRef, &$pendingWrites, $that) {
69-
$streamRef = $stream;
70-
assert($streamRef instanceof ConnectionInterface);
70+
function (ConnectionInterface $connection) use ($request, &$connectionRef, &$stateRef, &$pendingWrites, $that) {
71+
$connectionRef = $connection;
72+
assert($connectionRef instanceof ConnectionInterface);
7173

72-
$stream->on('drain', array($that, 'handleDrain'));
73-
$stream->on('data', array($that, 'handleData'));
74-
$stream->on('end', array($that, 'handleEnd'));
75-
$stream->on('error', array($that, 'handleError'));
76-
$stream->on('close', array($that, 'handleClose'));
74+
$connection->on('drain', array($that, 'handleDrain'));
75+
$connection->on('data', array($that, 'handleData'));
76+
$connection->on('end', array($that, 'handleEnd'));
77+
$connection->on('error', array($that, 'handleError'));
78+
$connection->on('close', array($that, 'close'));
7779

7880
assert($request instanceof RequestInterface);
7981
$headers = "{$request->getMethod()} {$request->getRequestTarget()} HTTP/{$request->getProtocolVersion()}\r\n";
@@ -83,7 +85,7 @@ function (ConnectionInterface $stream) use ($request, &$streamRef, &$stateRef, &
8385
}
8486
}
8587

86-
$more = $stream->write($headers . "\r\n" . $pendingWrites);
88+
$more = $connection->write($headers . "\r\n" . $pendingWrites);
8789

8890
assert($stateRef === ClientRequestStream::STATE_WRITING_HEAD);
8991
$stateRef = ClientRequestStream::STATE_HEAD_WRITTEN;
@@ -113,7 +115,7 @@ public function write($data)
113115

114116
// write directly to connection stream if already available
115117
if (self::STATE_HEAD_WRITTEN <= $this->state) {
116-
return $this->stream->write($data);
118+
return $this->connection->write($data);
117119
}
118120

119121
// otherwise buffer and try to establish connection
@@ -157,26 +159,28 @@ public function handleData($data)
157159
$response = gPsr\parse_response($this->buffer);
158160
$bodyChunk = (string) $response->getBody();
159161
} catch (\InvalidArgumentException $exception) {
160-
$this->emit('error', array($exception));
161-
}
162-
163-
$this->buffer = null;
164-
165-
$this->stream->removeListener('drain', array($this, 'handleDrain'));
166-
$this->stream->removeListener('data', array($this, 'handleData'));
167-
$this->stream->removeListener('end', array($this, 'handleEnd'));
168-
$this->stream->removeListener('error', array($this, 'handleError'));
169-
$this->stream->removeListener('close', array($this, 'handleClose'));
170-
171-
if (!isset($response)) {
162+
$this->closeError($exception);
172163
return;
173164
}
174165

175-
$this->stream->on('close', array($this, 'handleClose'));
176-
177-
assert($response instanceof ResponseInterface);
178-
assert($this->stream instanceof ConnectionInterface);
179-
$body = $this->stream;
166+
// response headers successfully received => remove listeners for connection events
167+
$connection = $this->connection;
168+
assert($connection instanceof ConnectionInterface);
169+
$connection->removeListener('drain', array($this, 'handleDrain'));
170+
$connection->removeListener('data', array($this, 'handleData'));
171+
$connection->removeListener('end', array($this, 'handleEnd'));
172+
$connection->removeListener('error', array($this, 'handleError'));
173+
$connection->removeListener('close', array($this, 'close'));
174+
$this->connection = null;
175+
$this->buffer = '';
176+
177+
// take control over connection handling and close connection once response body closes
178+
$that = $this;
179+
$input = $body = new CloseProtectionStream($connection);
180+
$input->on('close', function () use ($connection, $that) {
181+
$connection->close();
182+
$that->close();
183+
});
180184

181185
// determine length of response body
182186
$length = null;
@@ -194,7 +198,11 @@ public function handleData($data)
194198
$this->emit('response', array($response, $body));
195199

196200
// re-emit HTTP response body to trigger body parsing if parts of it are buffered
197-
$this->stream->emit('data', array($bodyChunk));
201+
if ($bodyChunk !== '') {
202+
$input->handleData($bodyChunk);
203+
} elseif ($length === 0) {
204+
$input->handleEnd();
205+
}
198206
}
199207
}
200208

@@ -216,12 +224,6 @@ public function handleError(\Exception $error)
216224
));
217225
}
218226

219-
/** @internal */
220-
public function handleClose()
221-
{
222-
$this->close();
223-
}
224-
225227
/** @internal */
226228
public function closeError(\Exception $error)
227229
{
@@ -240,9 +242,11 @@ public function close()
240242

241243
$this->state = self::STATE_END;
242244
$this->pendingWrites = '';
245+
$this->buffer = '';
243246

244-
if ($this->stream) {
245-
$this->stream->close();
247+
if ($this->connection instanceof ConnectionInterface) {
248+
$this->connection->close();
249+
$this->connection = null;
246250
}
247251

248252
$this->emit('close');

0 commit comments

Comments
 (0)