Skip to content
This repository was archived by the owner on May 10, 2024. It is now read-only.

Writable #1

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/Browser.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use React\Http\Io\Sender;
use React\Http\Io\Transaction;
use React\Promise\PromiseInterface;
use React\Socket\ConnectionInterface;
use React\Socket\ConnectorInterface;
use React\Stream\ReadableStreamInterface;
use InvalidArgumentException;
Expand Down Expand Up @@ -757,7 +758,7 @@ public function withResponseBuffer($maximumSize)
* @see self::withFollowRedirects()
* @see self::withRejectErrorResponse()
*/
private function withOptions(array $options)
public function withOptions(array $options)
{
$browser = clone $this;
$browser->transaction = $this->transaction->withOptions($options);
Expand All @@ -770,7 +771,7 @@ private function withOptions(array $options)
* @param string $url
* @param array $headers
* @param string|ReadableStreamInterface $body
* @return PromiseInterface<ResponseInterface,\Exception>
* @return PromiseInterface<ResponseInterface,\Exception,ConnectionInterface>
*/
private function requestMayBeStreaming($method, $url, array $headers = array(), $body = '')
{
Expand Down
15 changes: 15 additions & 0 deletions src/Client/Request.php
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ function (ConnectionInterface $stream) use ($requestData, &$streamRef, &$stateRe
});
}

protected function responseIsAnUpgradeResponse($response)
{
return
$response->hasHeader('Connection') &&
(in_array('upgrade', array_map('strtolower', $response->getHeader('Connection')))) &&
(int) $response->getStatusCode() === 101;
}

public function write($data)
{
if (!$this->isWritable()) {
Expand Down Expand Up @@ -140,6 +148,13 @@ public function handleData($data)
try {
$response = gPsr\parse_response($this->buffer);
$bodyChunk = (string) $response->getBody();

if ($this->responseIsAnUpgradeResponse($response)) {
$this->stream->removeListener('data', array($this, 'handleData'));

$this->emit('upgrade', array($this->stream, $response, $this));
return;
}
} catch (\InvalidArgumentException $exception) {
$this->emit('error', array($exception));
}
Expand Down
8 changes: 8 additions & 0 deletions src/Client/RequestData.php
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,12 @@ private function getAuthHeaders()

return array();
}

/**
* @return array
*/
public function getHeaders()
{
return $this->headers;
}
}
56 changes: 56 additions & 0 deletions src/Client/UpgradedResponse.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<?php

namespace React\Http\Client;

use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use React\Stream\DuplexStreamInterface;

class UpgradedResponse
{
/**
* @var DuplexStreamInterface
*/
private $connection;

/**
* @var ResponseInterface
*/
private $response;

/**
* @var RequestInterface
*/
private $request;

public function __construct(DuplexStreamInterface $connection, ResponseInterface $response, RequestInterface $request)
{
$this->connection = $connection;
$this->response = $response;
$this->request = $request;
}

/**
* @return DuplexStreamInterface
*/
public function getConnection()
{
return $this->connection;
}

/**
* @return ResponseInterface
*/
public function getResponse()
{
return $this->response;
}

/**
* @return RequestInterface
*/
public function getRequest()
{
return $this->request;
}
}
66 changes: 43 additions & 23 deletions src/Io/Sender.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
use React\EventLoop\LoopInterface;
use React\Http\Client\Client as HttpClient;
use React\Http\Message\Response;
use React\Http\Client\UpgradedResponse;
use React\Promise\PromiseInterface;
use React\Promise\Deferred;
use React\Socket\ConnectorInterface;
use React\Stream\DuplexStreamInterface;
use React\Stream\ReadableStreamInterface;

/**
Expand Down Expand Up @@ -70,33 +72,14 @@ public function __construct(HttpClient $http)
*
* @internal
* @param RequestInterface $request
* @return PromiseInterface Promise<ResponseInterface, Exception>
* @param bool $upgrade Configures the sender to listen for upgrade
* @return PromiseInterface Promise<ResponseInterface, ConnectionInterface, Exception>
*/
public function send(RequestInterface $request)
public function send(RequestInterface $request, $upgrade = false)
{
$body = $request->getBody();
$size = $body->getSize();

if ($size !== null && $size !== 0) {
// automatically assign a "Content-Length" request header if the body size is known and non-empty
$request = $request->withHeader('Content-Length', (string)$size);
} elseif ($size === 0 && \in_array($request->getMethod(), array('POST', 'PUT', 'PATCH'))) {
// only assign a "Content-Length: 0" request header if the body is expected for certain methods
$request = $request->withHeader('Content-Length', '0');
} elseif ($body instanceof ReadableStreamInterface && $body->isReadable() && !$request->hasHeader('Content-Length')) {
// use "Transfer-Encoding: chunked" when this is a streaming body and body size is unknown
$request = $request->withHeader('Transfer-Encoding', 'chunked');
} else {
// do not use chunked encoding if size is known or if this is an empty request body
$size = 0;
}

$headers = array();
foreach ($request->getHeaders() as $name => $values) {
$headers[$name] = implode(', ', $values);
}

$requestStream = $this->http->request($request->getMethod(), (string)$request->getUri(), $headers, $request->getProtocolVersion());
list($size, $requestStream) = $this->createRequestStream($request);

$deferred = new Deferred(function ($_, $reject) use ($requestStream) {
// close request stream if request is cancelled
Expand All @@ -122,6 +105,16 @@ public function send(RequestInterface $request)
$deferred->resolve($response->withBody(new ReadableBodyStream($body, $length)));
});

/**
* We listen for an upgrade, if the request was upgraded, we will hijack it and resolve immediately
* This is useful for websocket connections that requires an HTTP Upgrade.
*/
if ($upgrade) {
$requestStream->on('upgrade', function (DuplexStreamInterface $socket, ResponseInterface $response) use ($request, $deferred) {
$deferred->resolve(new UpgradedResponse($socket, $response, $request));
});
}

if ($body instanceof ReadableStreamInterface) {
if ($body->isReadable()) {
// length unknown => apply chunked transfer-encoding
Expand Down Expand Up @@ -157,4 +150,31 @@ public function send(RequestInterface $request)

return $deferred->promise();
}

protected function createRequestStream(RequestInterface $request)
{
$body = $request->getBody();
$size = $body->getSize();

if ($size !== null && $size !== 0) {
// automatically assign a "Content-Length" request header if the body size is known and non-empty
$request = $request->withHeader('Content-Length', (string)$size);
} elseif ($size === 0 && \in_array($request->getMethod(), array('POST', 'PUT', 'PATCH'))) {
// only assign a "Content-Length: 0" request header if the body is expected for certain methods
$request = $request->withHeader('Content-Length', '0');
} elseif ($body instanceof ReadableStreamInterface && $body->isReadable() && !$request->hasHeader('Content-Length')) {
// use "Transfer-Encoding: chunked" when this is a streaming body and body size is unknown
$request = $request->withHeader('Transfer-Encoding', 'chunked');
} else {
// do not use chunked encoding if size is known or if this is an empty request body
$size = 0;
}

$headers = array();
foreach ($request->getHeaders() as $name => $values) {
$headers[$name] = implode(', ', $values);
}

return array($size, $this->http->request($request->getMethod(), (string)$request->getUri(), $headers, $request->getProtocolVersion()));
}
}
9 changes: 7 additions & 2 deletions src/Io/Transaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ class Transaction

private $streaming = false;

// Determines whether to return the connection of an upgrade or not
private $upgrade = false;

private $maximumSize = 16777216; // 16 MiB = 2^24 bytes

public function __construct(Sender $sender, LoopInterface $loop)
Expand Down Expand Up @@ -81,7 +84,7 @@ public function send(RequestInterface $request)

$loop = $this->loop;
$this->next($request, $deferred)->then(
function (ResponseInterface $response) use ($deferred, $loop, &$timeout) {
function ($response) use ($deferred, $loop, &$timeout) {
if (isset($deferred->timeout)) {
$loop->cancelTimer($deferred->timeout);
unset($deferred->timeout);
Expand Down Expand Up @@ -144,7 +147,9 @@ private function next(RequestInterface $request, Deferred $deferred)
$that = $this;
++$deferred->numRequests;

$promise = $this->sender->send($request);
$promise = $this->sender->send($request, $this->upgrade);

if ($this->upgrade) return $promise;

if (!$this->streaming) {
$promise = $promise->then(function ($response) use ($deferred, $that) {
Expand Down