diff --git a/src/Browser.php b/src/Browser.php index 72847f66..a2bcccd0 100644 --- a/src/Browser.php +++ b/src/Browser.php @@ -11,6 +11,7 @@ use React\Http\Io\Sender; use React\Http\Io\Transaction; use React\Promise\PromiseInterface; +use React\Socket\ConnectionInterface; use React\Socket\ConnectorInterface; use React\Stream\ReadableStreamInterface; use InvalidArgumentException; @@ -757,7 +758,7 @@ public function withResponseBuffer($maximumSize) * @see self::withFollowRedirects() * @see self::withRejectErrorResponse() */ - private function withOptions(array $options) + public function withOptions(array $options) { $browser = clone $this; $browser->transaction = $this->transaction->withOptions($options); @@ -770,7 +771,7 @@ private function withOptions(array $options) * @param string $url * @param array $headers * @param string|ReadableStreamInterface $body - * @return PromiseInterface + * @return PromiseInterface */ private function requestMayBeStreaming($method, $url, array $headers = array(), $body = '') { diff --git a/src/Client/Request.php b/src/Client/Request.php index 51e03313..a0c8e7b1 100644 --- a/src/Client/Request.php +++ b/src/Client/Request.php @@ -89,6 +89,14 @@ function (ConnectionInterface $stream) use ($requestData, &$streamRef, &$stateRe }); } + protected function responseIsAnUpgradeResponse($response) + { + return + $response->hasHeader('Connection') && + (in_array('upgrade', array_map('strtolower', $response->getHeader('Connection')))) && + (int) $response->getStatusCode() === 101; + } + public function write($data) { if (!$this->isWritable()) { @@ -140,6 +148,13 @@ public function handleData($data) try { $response = gPsr\parse_response($this->buffer); $bodyChunk = (string) $response->getBody(); + + if ($this->responseIsAnUpgradeResponse($response)) { + $this->stream->removeListener('data', array($this, 'handleData')); + + $this->emit('upgrade', array($this->stream, $response, $this)); + return; + } } catch (\InvalidArgumentException $exception) { $this->emit('error', array($exception)); } diff --git a/src/Client/RequestData.php b/src/Client/RequestData.php index a5908a08..c3b0f685 100644 --- a/src/Client/RequestData.php +++ b/src/Client/RequestData.php @@ -125,4 +125,12 @@ private function getAuthHeaders() return array(); } + + /** + * @return array + */ + public function getHeaders() + { + return $this->headers; + } } diff --git a/src/Client/UpgradedResponse.php b/src/Client/UpgradedResponse.php new file mode 100644 index 00000000..8659b304 --- /dev/null +++ b/src/Client/UpgradedResponse.php @@ -0,0 +1,56 @@ +connection = $connection; + $this->response = $response; + $this->request = $request; + } + + /** + * @return DuplexStreamInterface + */ + public function getConnection() + { + return $this->connection; + } + + /** + * @return ResponseInterface + */ + public function getResponse() + { + return $this->response; + } + + /** + * @return RequestInterface + */ + public function getRequest() + { + return $this->request; + } +} \ No newline at end of file diff --git a/src/Io/Sender.php b/src/Io/Sender.php index 2f04c797..f0547ae8 100644 --- a/src/Io/Sender.php +++ b/src/Io/Sender.php @@ -7,9 +7,11 @@ use React\EventLoop\LoopInterface; use React\Http\Client\Client as HttpClient; use React\Http\Message\Response; +use React\Http\Client\UpgradedResponse; use React\Promise\PromiseInterface; use React\Promise\Deferred; use React\Socket\ConnectorInterface; +use React\Stream\DuplexStreamInterface; use React\Stream\ReadableStreamInterface; /** @@ -70,33 +72,14 @@ public function __construct(HttpClient $http) * * @internal * @param RequestInterface $request - * @return PromiseInterface Promise + * @param bool $upgrade Configures the sender to listen for upgrade + * @return PromiseInterface Promise */ - public function send(RequestInterface $request) + public function send(RequestInterface $request, $upgrade = false) { $body = $request->getBody(); - $size = $body->getSize(); - - if ($size !== null && $size !== 0) { - // automatically assign a "Content-Length" request header if the body size is known and non-empty - $request = $request->withHeader('Content-Length', (string)$size); - } elseif ($size === 0 && \in_array($request->getMethod(), array('POST', 'PUT', 'PATCH'))) { - // only assign a "Content-Length: 0" request header if the body is expected for certain methods - $request = $request->withHeader('Content-Length', '0'); - } elseif ($body instanceof ReadableStreamInterface && $body->isReadable() && !$request->hasHeader('Content-Length')) { - // use "Transfer-Encoding: chunked" when this is a streaming body and body size is unknown - $request = $request->withHeader('Transfer-Encoding', 'chunked'); - } else { - // do not use chunked encoding if size is known or if this is an empty request body - $size = 0; - } - - $headers = array(); - foreach ($request->getHeaders() as $name => $values) { - $headers[$name] = implode(', ', $values); - } - $requestStream = $this->http->request($request->getMethod(), (string)$request->getUri(), $headers, $request->getProtocolVersion()); + list($size, $requestStream) = $this->createRequestStream($request); $deferred = new Deferred(function ($_, $reject) use ($requestStream) { // close request stream if request is cancelled @@ -122,6 +105,16 @@ public function send(RequestInterface $request) $deferred->resolve($response->withBody(new ReadableBodyStream($body, $length))); }); + /** + * We listen for an upgrade, if the request was upgraded, we will hijack it and resolve immediately + * This is useful for websocket connections that requires an HTTP Upgrade. + */ + if ($upgrade) { + $requestStream->on('upgrade', function (DuplexStreamInterface $socket, ResponseInterface $response) use ($request, $deferred) { + $deferred->resolve(new UpgradedResponse($socket, $response, $request)); + }); + } + if ($body instanceof ReadableStreamInterface) { if ($body->isReadable()) { // length unknown => apply chunked transfer-encoding @@ -157,4 +150,31 @@ public function send(RequestInterface $request) return $deferred->promise(); } + + protected function createRequestStream(RequestInterface $request) + { + $body = $request->getBody(); + $size = $body->getSize(); + + if ($size !== null && $size !== 0) { + // automatically assign a "Content-Length" request header if the body size is known and non-empty + $request = $request->withHeader('Content-Length', (string)$size); + } elseif ($size === 0 && \in_array($request->getMethod(), array('POST', 'PUT', 'PATCH'))) { + // only assign a "Content-Length: 0" request header if the body is expected for certain methods + $request = $request->withHeader('Content-Length', '0'); + } elseif ($body instanceof ReadableStreamInterface && $body->isReadable() && !$request->hasHeader('Content-Length')) { + // use "Transfer-Encoding: chunked" when this is a streaming body and body size is unknown + $request = $request->withHeader('Transfer-Encoding', 'chunked'); + } else { + // do not use chunked encoding if size is known or if this is an empty request body + $size = 0; + } + + $headers = array(); + foreach ($request->getHeaders() as $name => $values) { + $headers[$name] = implode(', ', $values); + } + + return array($size, $this->http->request($request->getMethod(), (string)$request->getUri(), $headers, $request->getProtocolVersion())); + } } diff --git a/src/Io/Transaction.php b/src/Io/Transaction.php index 7bf7008f..583fb83b 100644 --- a/src/Io/Transaction.php +++ b/src/Io/Transaction.php @@ -35,6 +35,9 @@ class Transaction private $streaming = false; + // Determines whether to return the connection of an upgrade or not + private $upgrade = false; + private $maximumSize = 16777216; // 16 MiB = 2^24 bytes public function __construct(Sender $sender, LoopInterface $loop) @@ -81,7 +84,7 @@ public function send(RequestInterface $request) $loop = $this->loop; $this->next($request, $deferred)->then( - function (ResponseInterface $response) use ($deferred, $loop, &$timeout) { + function ($response) use ($deferred, $loop, &$timeout) { if (isset($deferred->timeout)) { $loop->cancelTimer($deferred->timeout); unset($deferred->timeout); @@ -144,7 +147,9 @@ private function next(RequestInterface $request, Deferred $deferred) $that = $this; ++$deferred->numRequests; - $promise = $this->sender->send($request); + $promise = $this->sender->send($request, $this->upgrade); + + if ($this->upgrade) return $promise; if (!$this->streaming) { $promise = $promise->then(function ($response) use ($deferred, $that) {