From 9dd2bc93d930272724bb01e84cfc62eb8ac27a67 Mon Sep 17 00:00:00 2001 From: bosunski Date: Sun, 26 Jul 2020 12:13:42 +0100 Subject: [PATCH 01/17] Begin Writable implementation --- src/Browser.php | 10 +- src/Client/Request.php | 321 +++++++++++++++++++++++++++++++++++++ src/Client/RequestData.php | 141 ++++++++++++++++ src/Io/Sender.php | 80 ++++++--- src/Io/Transaction.php | 2 + 5 files changed, 532 insertions(+), 22 deletions(-) create mode 100644 src/Client/Request.php create mode 100644 src/Client/RequestData.php diff --git a/src/Browser.php b/src/Browser.php index ad9187a6..e651cca6 100644 --- a/src/Browser.php +++ b/src/Browser.php @@ -10,6 +10,7 @@ use React\Http\Io\Transaction; use React\Http\Message\Request; use React\Promise\PromiseInterface; +use React\Socket\ConnectionInterface; use React\Socket\ConnectorInterface; use React\Stream\ReadableStreamInterface; use InvalidArgumentException; @@ -425,6 +426,11 @@ public function requestStreaming($method, $url, $headers = array(), $body = '') return $this->withOptions(array('streaming' => true))->requestMayBeStreaming($method, $url, $headers, $body); } + public function requestUpgrade($method, $url, $headers = array(), $contents = '') + { + return $this->withOptions(array('upgrade' => true))->requestMayBeStreaming($method, $url, $headers, $contents); + } + /** * Changes the maximum timeout used for waiting for pending requests. * @@ -816,7 +822,7 @@ public function withoutHeader($header) * @see self::withFollowRedirects() * @see self::withRejectErrorResponse() */ - private function withOptions(array $options) + public function withOptions(array $options) { $browser = clone $this; $browser->transaction = $this->transaction->withOptions($options); @@ -829,7 +835,7 @@ private function withOptions(array $options) * @param string $url * @param array $headers * @param string|ReadableStreamInterface $body - * @return PromiseInterface + * @return PromiseInterface */ private function requestMayBeStreaming($method, $url, array $headers = array(), $body = '') { diff --git a/src/Client/Request.php b/src/Client/Request.php new file mode 100644 index 00000000..88c7e9a5 --- /dev/null +++ b/src/Client/Request.php @@ -0,0 +1,321 @@ +connector = $connector; + $this->requestData = $requestData; + } + + public function isWritable() + { + return self::STATE_END > $this->state && !$this->ended; + } + + private function writeHead() + { + $this->state = self::STATE_WRITING_HEAD; + + $requestData = $this->requestData; + $streamRef = &$this->stream; + $stateRef = &$this->state; + $pendingWrites = &$this->pendingWrites; + $that = $this; + + $promise = $this->connect(); + $promise->then( + function (ConnectionInterface $stream) use ($requestData, &$streamRef, &$stateRef, &$pendingWrites, $that) { + $streamRef = $stream; + + $stream->on('drain', array($that, 'handleDrain')); + + $buffer = ''; + $headerParser = function ($data) use ($requestData, &$headerParser, $buffer, $streamRef, $that) { + $buffer .= $data; + + static $headerParsed = false; + + if (!$requestData->isUpgradeRequest() || $headerParsed || false == strpos($buffer, "\r\n\r\n")) { + return $that->handleData($data); + } + + $headerParsed = true; + + $response = gPsr\parse_response($buffer); + + $streamRef->removeListener('data', $headerParser); + + $that->emit('upgrade', array($streamRef, $response, $that)); + }; + + $stream->on('data', $headerParser); + + $stream->on('end', array($that, 'handleEnd')); + $stream->on('error', array($that, 'handleError')); + $stream->on('close', array($that, 'handleClose')); + + $headers = (string) $requestData; + + $more = $stream->write($headers . $pendingWrites); + + $stateRef = Request::STATE_HEAD_WRITTEN; + + // clear pending writes if non-empty + if ($pendingWrites !== '') { + $pendingWrites = ''; + + if ($more) { + $that->emit('drain'); + } + } + }, + array($this, 'closeError') + ); + + $this->on('close', function() use ($promise) { + $promise->cancel(); + }); + } + + public function write($data) + { + if (!$this->isWritable()) { + return false; + } + + // write directly to connection stream if already available + if (self::STATE_HEAD_WRITTEN <= $this->state) { + return $this->stream->write($data); + } + + // otherwise buffer and try to establish connection + $this->pendingWrites .= $data; + if (self::STATE_WRITING_HEAD > $this->state) { + $this->writeHead(); + } + + return false; + } + + public function end($data = null) + { + if (!$this->isWritable()) { + return; + } + + if (null !== $data) { + $this->write($data); + } else if (self::STATE_WRITING_HEAD > $this->state) { + $this->writeHead(); + } + + $this->ended = true; + } + + /** @internal */ + public function handleDrain() + { + $this->emit('drain'); + } + + /** @internal */ + public function handleData($data) + { + $this->buffer .= $data; + + // buffer until double CRLF (or double LF for compatibility with legacy servers) + if (false !== strpos($this->buffer, "\r\n\r\n") || false !== strpos($this->buffer, "\n\n")) { + try { + list($response, $bodyChunk) = $this->parseResponse($this->buffer); + } catch (\InvalidArgumentException $exception) { + $this->emit('error', array($exception)); + } + + $this->buffer = null; + + $this->stream->removeListener('drain', array($this, 'handleDrain')); + $this->stream->removeListener('data', array($this, 'handleData')); + $this->stream->removeListener('end', array($this, 'handleEnd')); + $this->stream->removeListener('error', array($this, 'handleError')); + $this->stream->removeListener('close', array($this, 'handleClose')); + + if (!isset($response)) { + return; + } + + $response->on('close', array($this, 'close')); + $that = $this; + $response->on('error', function (\Exception $error) use ($that) { + $that->closeError(new \RuntimeException( + "An error occured in the response", + 0, + $error + )); + }); + + $this->emit('response', array($response, $this)); + + $this->stream->emit('data', array($bodyChunk)); + } + } + + /** @internal */ + public function handleEnd() + { + $this->closeError(new \RuntimeException( + "Connection ended before receiving response" + )); + } + + /** @internal */ + public function handleError(\Exception $error) + { + $this->closeError(new \RuntimeException( + "An error occurred in the underlying stream", + 0, + $error + )); + } + + /** @internal */ + public function handleClose() + { + $this->close(); + } + + /** @internal */ + public function closeError(\Exception $error) + { + if (self::STATE_END <= $this->state) { + return; + } + $this->emit('error', array($error)); + $this->close(); + } + + public function close() + { + if (self::STATE_END <= $this->state) { + return; + } + + $this->state = self::STATE_END; + $this->pendingWrites = ''; + + if ($this->stream) { + $this->stream->close(); + } + + $this->emit('close'); + $this->removeAllListeners(); + } + + protected function parseResponse($data) + { + $psrResponse = gPsr\parse_response($data); + $headers = array_map(function($val) { + if (1 === count($val)) { + $val = $val[0]; + } + + return $val; + }, $psrResponse->getHeaders()); + + $factory = $this->getResponseFactory(); + + $response = $factory( + 'HTTP', + $psrResponse->getProtocolVersion(), + $psrResponse->getStatusCode(), + $psrResponse->getReasonPhrase(), + $headers + ); + + return array($response, (string)($psrResponse->getBody())); + } + + protected function connect() + { + $scheme = $this->requestData->getScheme(); + if ($scheme !== 'https' && $scheme !== 'http') { + return Promise\reject( + new \InvalidArgumentException('Invalid request URL given') + ); + } + + $host = $this->requestData->getHost(); + $port = $this->requestData->getPort(); + + if ($scheme === 'https') { + $host = 'tls://' . $host; + } + + return $this->connector + ->connect($host . ':' . $port); + } + + public function setResponseFactory($factory) + { + $this->responseFactory = $factory; + } + + public function getResponseFactory() + { + if (null === $factory = $this->responseFactory) { + $stream = $this->stream; + + $factory = function ($protocol, $version, $code, $reasonPhrase, $headers) use ($stream) { + return new Response( + $stream, + $protocol, + $version, + $code, + $reasonPhrase, + $headers + ); + }; + + $this->responseFactory = $factory; + } + + return $factory; + } + + public function getRequestData() + { + return $this->requestData; + } +} diff --git a/src/Client/RequestData.php b/src/Client/RequestData.php new file mode 100644 index 00000000..c37c0065 --- /dev/null +++ b/src/Client/RequestData.php @@ -0,0 +1,141 @@ +method = $method; + $this->url = $url; + $this->headers = $headers; + $this->protocolVersion = $protocolVersion; + } + + private function mergeDefaultheaders(array $headers) + { + $port = ($this->getDefaultPort() === $this->getPort()) ? '' : ":{$this->getPort()}"; + $connectionHeaders = ('1.1' === $this->protocolVersion) ? array('Connection' => 'close') : array(); + $authHeaders = $this->getAuthHeaders(); + + $defaults = array_merge( + array( + 'Host' => $this->getHost().$port, + 'User-Agent' => 'ReactPHP/1', + ), + $connectionHeaders, + $authHeaders + ); + + // remove all defaults that already exist in $headers + $lower = array_change_key_case($headers, CASE_LOWER); + foreach ($defaults as $key => $_) { + if (isset($lower[strtolower($key)])) { + unset($defaults[$key]); + } + } + + return array_merge($defaults, $headers); + } + + public function getScheme() + { + return parse_url($this->url, PHP_URL_SCHEME); + } + + public function getHost() + { + return parse_url($this->url, PHP_URL_HOST); + } + + public function getPort() + { + return (int) parse_url($this->url, PHP_URL_PORT) ?: $this->getDefaultPort(); + } + + public function getDefaultPort() + { + return ('https' === $this->getScheme()) ? 443 : 80; + } + + public function getPath() + { + $path = parse_url($this->url, PHP_URL_PATH); + $queryString = parse_url($this->url, PHP_URL_QUERY); + + // assume "/" path by default, but allow "OPTIONS *" + if ($path === null) { + $path = ($this->method === 'OPTIONS' && $queryString === null) ? '*': '/'; + } + if ($queryString !== null) { + $path .= '?' . $queryString; + } + + return $path; + } + + public function setProtocolVersion($version) + { + $this->protocolVersion = $version; + } + + public function __toString() + { + $headers = $this->mergeDefaultheaders($this->headers); + + $data = ''; + $data .= "{$this->method} {$this->getPath()} HTTP/{$this->protocolVersion}\r\n"; + foreach ($headers as $name => $values) { + foreach ((array)$values as $value) { + $data .= "$name: $value\r\n"; + } + } + $data .= "\r\n"; + + return $data; + } + + private function getUrlUserPass() + { + $components = parse_url($this->url); + + if (isset($components['user'])) { + return array( + 'user' => $components['user'], + 'pass' => isset($components['pass']) ? $components['pass'] : null, + ); + } + } + + private function getAuthHeaders() + { + if (null !== $auth = $this->getUrlUserPass()) { + return array( + 'Authorization' => 'Basic ' . base64_encode($auth['user'].':'.$auth['pass']), + ); + } + + return array(); + } + + public function isUpgradeRequest() + { + return isset($this->headers['Connection']) && strtolower($this->headers['Connection']) === "upgrade"; + } + + /** + * @return array + */ + public function getHeaders() + { + return $this->headers; + } +} diff --git a/src/Io/Sender.php b/src/Io/Sender.php index 3598d31a..e90f9b4a 100644 --- a/src/Io/Sender.php +++ b/src/Io/Sender.php @@ -9,6 +9,7 @@ use React\Promise\PromiseInterface; use React\Promise\Deferred; use React\Socket\Connector; +use React\Socket\ConnectionInterface; use React\Socket\ConnectorInterface; use React\Stream\ReadableStreamInterface; @@ -84,26 +85,7 @@ public function send(RequestInterface $request) $body = $request->getBody(); $size = $body->getSize(); - if ($size !== null && $size !== 0) { - // automatically assign a "Content-Length" request header if the body size is known and non-empty - $request = $request->withHeader('Content-Length', (string)$size); - } elseif ($size === 0 && \in_array($request->getMethod(), array('POST', 'PUT', 'PATCH'))) { - // only assign a "Content-Length: 0" request header if the body is expected for certain methods - $request = $request->withHeader('Content-Length', '0'); - } elseif ($body instanceof ReadableStreamInterface && $body->isReadable() && !$request->hasHeader('Content-Length')) { - // use "Transfer-Encoding: chunked" when this is a streaming body and body size is unknown - $request = $request->withHeader('Transfer-Encoding', 'chunked'); - } else { - // do not use chunked encoding if size is known or if this is an empty request body - $size = 0; - } - - // automatically add `Authorization: Basic …` request header if URL includes `user:pass@host` - if ($request->getUri()->getUserInfo() !== '' && !$request->hasHeader('Authorization')) { - $request = $request->withHeader('Authorization', 'Basic ' . \base64_encode($request->getUri()->getUserInfo())); - } - - $requestStream = $this->http->request($request); + $requestStream = $this->createRequestStream($request); $deferred = new Deferred(function ($_, $reject) use ($requestStream) { // close request stream if request is cancelled @@ -119,6 +101,10 @@ public function send(RequestInterface $request) $deferred->resolve($response); }); + $requestStream->on('upgrade', function (ConnectionInterface $socket) use ($deferred) { + $deferred->resolve($socket); + }); + if ($body instanceof ReadableStreamInterface) { if ($body->isReadable()) { // length unknown => apply chunked transfer-encoding @@ -154,4 +140,58 @@ public function send(RequestInterface $request) return $deferred->promise(); } + + /** + * + * @internal + * @param RequestInterface $request + * @return PromiseInterface Promise + */ + public function upgrade(RequestInterface $request) + { + $requestStream = $this->createRequestStream($request); + + $deferred = new Deferred(function ($_, $reject) use ($requestStream) { + // close request stream if request is cancelled + $reject(new \RuntimeException('Request cancelled')); + $requestStream->close(); + }); + + $requestStream->on('error', function($error) use ($deferred) { + $deferred->reject($error); + }); + + $requestStream->on('upgrade', function (ConnectorInterface $socket) use ($deferred) { + $deferred->resolve($socket); + }); + + return $deferred->promise(); + } + + protected function createRequestStream(RequestInterface $request) + { + $body = $request->getBody(); + $size = $body->getSize(); + + if ($size !== null && $size !== 0) { + // automatically assign a "Content-Length" request header if the body size is known and non-empty + $request = $request->withHeader('Content-Length', (string)$size); + } elseif ($size === 0 && \in_array($request->getMethod(), array('POST', 'PUT', 'PATCH'))) { + // only assign a "Content-Length: 0" request header if the body is expected for certain methods + $request = $request->withHeader('Content-Length', '0'); + } elseif ($body instanceof ReadableStreamInterface && $body->isReadable() && !$request->hasHeader('Content-Length')) { + // use "Transfer-Encoding: chunked" when this is a streaming body and body size is unknown + $request = $request->withHeader('Transfer-Encoding', 'chunked'); + } else { + // do not use chunked encoding if size is known or if this is an empty request body + $size = 0; + } + + // automatically add `Authorization: Basic …` request header if URL includes `user:pass@host` + if ($request->getUri()->getUserInfo() !== '' && !$request->hasHeader('Authorization')) { + $request = $request->withHeader('Authorization', 'Basic ' . \base64_encode($request->getUri()->getUserInfo())); + } + + return $this->http->request($request); + } } diff --git a/src/Io/Transaction.php b/src/Io/Transaction.php index b93c490c..a581348b 100644 --- a/src/Io/Transaction.php +++ b/src/Io/Transaction.php @@ -36,6 +36,8 @@ class Transaction private $streaming = false; + private $upgrade = false; + private $maximumSize = 16777216; // 16 MiB = 2^24 bytes public function __construct(Sender $sender, LoopInterface $loop) From 4e27825ba54d07b3993be7d0d9b3937ed23d4e63 Mon Sep 17 00:00:00 2001 From: bosunski Date: Mon, 27 Jul 2020 20:21:57 +0100 Subject: [PATCH 02/17] Add upgrade listener and provide option for upgrade in Transaction --- src/Client/UpgradedResponse.php | 56 +++++++++++++++++++++++++++++++++ src/Io/Sender.php | 36 +++++---------------- src/Io/Transaction.php | 2 ++ 3 files changed, 65 insertions(+), 29 deletions(-) create mode 100644 src/Client/UpgradedResponse.php diff --git a/src/Client/UpgradedResponse.php b/src/Client/UpgradedResponse.php new file mode 100644 index 00000000..795869e8 --- /dev/null +++ b/src/Client/UpgradedResponse.php @@ -0,0 +1,56 @@ +connection = $connection; + $this->response = $response; + $this->request = $request; + } + + /** + * @return ConnectionInterface + */ + public function getConnection() + { + return $this->connection; + } + + /** + * @return ResponseInterface + */ + public function getResponse() + { + return $this->response; + } + + /** + * @return RequestInterface + */ + public function getRequest() + { + return $this->request; + } +} \ No newline at end of file diff --git a/src/Io/Sender.php b/src/Io/Sender.php index e90f9b4a..9da298eb 100644 --- a/src/Io/Sender.php +++ b/src/Io/Sender.php @@ -6,6 +6,7 @@ use Psr\Http\Message\ResponseInterface; use React\EventLoop\LoopInterface; use React\Http\Client\Client as HttpClient; +use React\Http\Client\UpgradedResponse; use React\Promise\PromiseInterface; use React\Promise\Deferred; use React\Socket\Connector; @@ -101,8 +102,12 @@ public function send(RequestInterface $request) $deferred->resolve($response); }); - $requestStream->on('upgrade', function (ConnectionInterface $socket) use ($deferred) { - $deferred->resolve($socket); + /** + * We listen for an upgrade, if the request was upgraded, we will hijack it and resolve immediately + * This is useful for websocket connections that requires an HTTP Upgrade. + */ + $requestStream->on('upgrade', function (ConnectionInterface $socket, ResponseInterface $response) use ($request, $deferred) { + $deferred->resolve(new UpgradedResponse($socket, $response, $request)); }); if ($body instanceof ReadableStreamInterface) { @@ -141,33 +146,6 @@ public function send(RequestInterface $request) return $deferred->promise(); } - /** - * - * @internal - * @param RequestInterface $request - * @return PromiseInterface Promise - */ - public function upgrade(RequestInterface $request) - { - $requestStream = $this->createRequestStream($request); - - $deferred = new Deferred(function ($_, $reject) use ($requestStream) { - // close request stream if request is cancelled - $reject(new \RuntimeException('Request cancelled')); - $requestStream->close(); - }); - - $requestStream->on('error', function($error) use ($deferred) { - $deferred->reject($error); - }); - - $requestStream->on('upgrade', function (ConnectorInterface $socket) use ($deferred) { - $deferred->resolve($socket); - }); - - return $deferred->promise(); - } - protected function createRequestStream(RequestInterface $request) { $body = $request->getBody(); diff --git a/src/Io/Transaction.php b/src/Io/Transaction.php index a581348b..b5371d64 100644 --- a/src/Io/Transaction.php +++ b/src/Io/Transaction.php @@ -147,6 +147,8 @@ private function next(RequestInterface $request, Deferred $deferred, ClientReque $promise = $this->sender->send($request); + if ($this->upgrade) return $promise; + if (!$this->streaming) { $promise = $promise->then(function ($response) use ($deferred, $state, $that) { return $that->bufferResponse($response, $deferred, $state); From 116b0f06d1c951e0a68e13bfb91cb944ea4195b8 Mon Sep 17 00:00:00 2001 From: bosunski Date: Mon, 27 Jul 2020 20:23:31 +0100 Subject: [PATCH 03/17] Add upgrade listener and provide option for upgrade in Transaction --- src/Io/Sender.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Io/Sender.php b/src/Io/Sender.php index 9da298eb..1c764e93 100644 --- a/src/Io/Sender.php +++ b/src/Io/Sender.php @@ -76,7 +76,7 @@ public function __construct(HttpClient $http) * * @internal * @param RequestInterface $request - * @return PromiseInterface Promise + * @return PromiseInterface Promise */ public function send(RequestInterface $request) { From 8e1cd533590eec4fd0643c4232d293b6d7704fe8 Mon Sep 17 00:00:00 2001 From: bosunski Date: Mon, 27 Jul 2020 20:30:05 +0100 Subject: [PATCH 04/17] Reverted Scope --- src/Browser.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Browser.php b/src/Browser.php index e651cca6..2e19cc4d 100644 --- a/src/Browser.php +++ b/src/Browser.php @@ -822,7 +822,7 @@ public function withoutHeader($header) * @see self::withFollowRedirects() * @see self::withRejectErrorResponse() */ - public function withOptions(array $options) + private function withOptions(array $options) { $browser = clone $this; $browser->transaction = $this->transaction->withOptions($options); From c0827e6db35d382bab4bdbc0ab952210a10db756 Mon Sep 17 00:00:00 2001 From: bosunski Date: Mon, 27 Jul 2020 20:48:17 +0100 Subject: [PATCH 05/17] Configures sender to upgrade or not --- src/Io/Sender.php | 11 +++++++---- src/Io/Transaction.php | 3 ++- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/Io/Sender.php b/src/Io/Sender.php index 1c764e93..00dc9414 100644 --- a/src/Io/Sender.php +++ b/src/Io/Sender.php @@ -76,9 +76,10 @@ public function __construct(HttpClient $http) * * @internal * @param RequestInterface $request + * @param bool $upgrade Configures the sender to listen for upgrade * @return PromiseInterface Promise */ - public function send(RequestInterface $request) + public function send(RequestInterface $request, $upgrade = false) { // support HTTP/1.1 and HTTP/1.0 only, ensured by `Browser` already assert(\in_array($request->getProtocolVersion(), array('1.0', '1.1'), true)); @@ -106,9 +107,11 @@ public function send(RequestInterface $request) * We listen for an upgrade, if the request was upgraded, we will hijack it and resolve immediately * This is useful for websocket connections that requires an HTTP Upgrade. */ - $requestStream->on('upgrade', function (ConnectionInterface $socket, ResponseInterface $response) use ($request, $deferred) { - $deferred->resolve(new UpgradedResponse($socket, $response, $request)); - }); + if ($upgrade) { + $requestStream->on('upgrade', function (ConnectionInterface $socket, ResponseInterface $response) use ($request, $deferred) { + $deferred->resolve(new UpgradedResponse($socket, $response, $request)); + }); + } if ($body instanceof ReadableStreamInterface) { if ($body->isReadable()) { diff --git a/src/Io/Transaction.php b/src/Io/Transaction.php index b5371d64..b8f15ae7 100644 --- a/src/Io/Transaction.php +++ b/src/Io/Transaction.php @@ -36,6 +36,7 @@ class Transaction private $streaming = false; + // Determines whether to return the connection of an upgrade or not private $upgrade = false; private $maximumSize = 16777216; // 16 MiB = 2^24 bytes @@ -145,7 +146,7 @@ private function next(RequestInterface $request, Deferred $deferred, ClientReque $that = $this; ++$state->numRequests; - $promise = $this->sender->send($request); + $promise = $this->sender->send($request, $this->upgrade); if ($this->upgrade) return $promise; From 4423afeffe0b78fdc8d5e76dd28c47ca20490d48 Mon Sep 17 00:00:00 2001 From: bosunski Date: Mon, 27 Jul 2020 21:16:15 +0100 Subject: [PATCH 06/17] Add check for upgrade on response instead of request --- src/Client/Request.php | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/Client/Request.php b/src/Client/Request.php index 88c7e9a5..f27c644b 100644 --- a/src/Client/Request.php +++ b/src/Client/Request.php @@ -68,7 +68,7 @@ function (ConnectionInterface $stream) use ($requestData, &$streamRef, &$stateRe static $headerParsed = false; - if (!$requestData->isUpgradeRequest() || $headerParsed || false == strpos($buffer, "\r\n\r\n")) { + if ($headerParsed || false == strpos($buffer, "\r\n\r\n")) { return $that->handleData($data); } @@ -76,6 +76,10 @@ function (ConnectionInterface $stream) use ($requestData, &$streamRef, &$stateRe $response = gPsr\parse_response($buffer); + if (!$that->responseIsAnUpgradeResponse($response)) { + return $that->handleData($data); + } + $streamRef->removeListener('data', $headerParser); $that->emit('upgrade', array($streamRef, $response, $that)); @@ -110,6 +114,14 @@ function (ConnectionInterface $stream) use ($requestData, &$streamRef, &$stateRe }); } + protected function responseIsAnUpgradeResponse($response) + { + return + $response->hasHeader('Connection') && + (in_array('upgrade', array_map('strtolower', $response->getHeader('Connection')))) && + (int) $response->getStatusCode() === 101; + } + public function write($data) { if (!$this->isWritable()) { From 629b549fec5fb7fbffe3a5ca5c83183e9e5d817d Mon Sep 17 00:00:00 2001 From: bosunski Date: Mon, 27 Jul 2020 21:16:51 +0100 Subject: [PATCH 07/17] Remove isUpgradeRequest --- src/Client/RequestData.php | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/Client/RequestData.php b/src/Client/RequestData.php index c37c0065..c3b0f685 100644 --- a/src/Client/RequestData.php +++ b/src/Client/RequestData.php @@ -126,11 +126,6 @@ private function getAuthHeaders() return array(); } - public function isUpgradeRequest() - { - return isset($this->headers['Connection']) && strtolower($this->headers['Connection']) === "upgrade"; - } - /** * @return array */ From e142e66ad6afaebb8bcf4c083469646e35d76f92 Mon Sep 17 00:00:00 2001 From: bosunski Date: Mon, 27 Jul 2020 21:18:52 +0100 Subject: [PATCH 08/17] Remove requestUpgrade method --- src/Browser.php | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/Browser.php b/src/Browser.php index 2e19cc4d..f52ddffc 100644 --- a/src/Browser.php +++ b/src/Browser.php @@ -426,11 +426,6 @@ public function requestStreaming($method, $url, $headers = array(), $body = '') return $this->withOptions(array('streaming' => true))->requestMayBeStreaming($method, $url, $headers, $body); } - public function requestUpgrade($method, $url, $headers = array(), $contents = '') - { - return $this->withOptions(array('upgrade' => true))->requestMayBeStreaming($method, $url, $headers, $contents); - } - /** * Changes the maximum timeout used for waiting for pending requests. * From 06aea6aacfaf4952f1f5ae40af5fbf55db0e7744 Mon Sep 17 00:00:00 2001 From: bosunski Date: Mon, 27 Jul 2020 22:08:48 +0100 Subject: [PATCH 09/17] Replaced ConnectionInteface With DuplexStreamInterface In upgraded response --- src/Client/UpgradedResponse.php | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Client/UpgradedResponse.php b/src/Client/UpgradedResponse.php index 795869e8..8659b304 100644 --- a/src/Client/UpgradedResponse.php +++ b/src/Client/UpgradedResponse.php @@ -4,12 +4,12 @@ use Psr\Http\Message\RequestInterface; use Psr\Http\Message\ResponseInterface; -use React\Socket\ConnectionInterface; +use React\Stream\DuplexStreamInterface; class UpgradedResponse { /** - * @var ConnectionInterface + * @var DuplexStreamInterface */ private $connection; @@ -23,7 +23,7 @@ class UpgradedResponse */ private $request; - public function __construct(ConnectionInterface $connection, ResponseInterface $response, RequestInterface $request) + public function __construct(DuplexStreamInterface $connection, ResponseInterface $response, RequestInterface $request) { $this->connection = $connection; $this->response = $response; @@ -31,7 +31,7 @@ public function __construct(ConnectionInterface $connection, ResponseInterface $ } /** - * @return ConnectionInterface + * @return DuplexStreamInterface */ public function getConnection() { From 422e0d25c6fd500b9939f32df71a10fbcd81bd35 Mon Sep 17 00:00:00 2001 From: bosunski Date: Mon, 27 Jul 2020 22:11:38 +0100 Subject: [PATCH 10/17] Replaced ConnectionInteface With DuplexStreamInterface In Sender --- src/Io/Sender.php | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Io/Sender.php b/src/Io/Sender.php index 00dc9414..204b5257 100644 --- a/src/Io/Sender.php +++ b/src/Io/Sender.php @@ -12,6 +12,7 @@ use React\Socket\Connector; use React\Socket\ConnectionInterface; use React\Socket\ConnectorInterface; +use React\Stream\DuplexStreamInterface; use React\Stream\ReadableStreamInterface; /** @@ -108,7 +109,7 @@ public function send(RequestInterface $request, $upgrade = false) * This is useful for websocket connections that requires an HTTP Upgrade. */ if ($upgrade) { - $requestStream->on('upgrade', function (ConnectionInterface $socket, ResponseInterface $response) use ($request, $deferred) { + $requestStream->on('upgrade', function (DuplexStreamInterface $socket, ResponseInterface $response) use ($request, $deferred) { $deferred->resolve(new UpgradedResponse($socket, $response, $request)); }); } From c5aab0dc9b3b29d254333d24bfd0f17c51a082d7 Mon Sep 17 00:00:00 2001 From: bosunski Date: Tue, 28 Jul 2020 01:10:26 +0100 Subject: [PATCH 11/17] Start fixing tests --- src/Client/Request.php | 38 ++++++++++++++++++++++++++++++-------- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/src/Client/Request.php b/src/Client/Request.php index f27c644b..0a026d98 100644 --- a/src/Client/Request.php +++ b/src/Client/Request.php @@ -63,7 +63,7 @@ function (ConnectionInterface $stream) use ($requestData, &$streamRef, &$stateRe $stream->on('drain', array($that, 'handleDrain')); $buffer = ''; - $headerParser = function ($data) use ($requestData, &$headerParser, $buffer, $streamRef, $that) { + $headerParser = function ($data) use (&$headerParser, $buffer, $streamRef, $that) { $buffer .= $data; static $headerParsed = false; @@ -76,17 +76,17 @@ function (ConnectionInterface $stream) use ($requestData, &$streamRef, &$stateRe $response = gPsr\parse_response($buffer); - if (!$that->responseIsAnUpgradeResponse($response)) { - return $that->handleData($data); - } + if ($that->responseIsAnUpgradeResponse($response)) { + $that->stream->removeListener('data', $headerParser); - $streamRef->removeListener('data', $headerParser); + $that->emit('upgrade', array($that->stream, $response, $that)); + return; + } - $that->emit('upgrade', array($streamRef, $response, $that)); + return $that->handleData($data); }; - $stream->on('data', $headerParser); - + $stream->on('data', array($that, 'handleData')); $stream->on('end', array($that, 'handleEnd')); $stream->on('error', array($that, 'handleError')); $stream->on('close', array($that, 'handleClose')); @@ -168,7 +168,29 @@ public function handleData($data) { $this->buffer .= $data; + static $headerParsed = false; + + if ($headerParsed || false == strpos($this->buffer, "\r\n\r\n")) { + return $this->handleEx(); + } + + $headerParsed = true; + + $response = gPsr\parse_response($this->buffer); + + if ($this->responseIsAnUpgradeResponse($response)) { + $this->stream->removeListener('data', array($this, 'handleData')); + + $this->emit('upgrade', array($this->stream, $response, $this)); + return; + } + // buffer until double CRLF (or double LF for compatibility with legacy servers) + return $this->handleEx(); + } + + protected function handleEx() + { if (false !== strpos($this->buffer, "\r\n\r\n") || false !== strpos($this->buffer, "\n\n")) { try { list($response, $bodyChunk) = $this->parseResponse($this->buffer); From 70bc4388b723c7475730edf2b68e8fe4a4bb1985 Mon Sep 17 00:00:00 2001 From: bosunski Date: Tue, 28 Jul 2020 01:53:05 +0100 Subject: [PATCH 12/17] Refactored some parts so tests can pass --- src/Client/Request.php | 62 ++++++++---------------------------------- src/Io/Sender.php | 3 +- 2 files changed, 13 insertions(+), 52 deletions(-) diff --git a/src/Client/Request.php b/src/Client/Request.php index 0a026d98..1466ef66 100644 --- a/src/Client/Request.php +++ b/src/Client/Request.php @@ -61,31 +61,6 @@ function (ConnectionInterface $stream) use ($requestData, &$streamRef, &$stateRe $streamRef = $stream; $stream->on('drain', array($that, 'handleDrain')); - - $buffer = ''; - $headerParser = function ($data) use (&$headerParser, $buffer, $streamRef, $that) { - $buffer .= $data; - - static $headerParsed = false; - - if ($headerParsed || false == strpos($buffer, "\r\n\r\n")) { - return $that->handleData($data); - } - - $headerParsed = true; - - $response = gPsr\parse_response($buffer); - - if ($that->responseIsAnUpgradeResponse($response)) { - $that->stream->removeListener('data', $headerParser); - - $that->emit('upgrade', array($that->stream, $response, $that)); - return; - } - - return $that->handleData($data); - }; - $stream->on('data', array($that, 'handleData')); $stream->on('end', array($that, 'handleEnd')); $stream->on('error', array($that, 'handleError')); @@ -168,32 +143,19 @@ public function handleData($data) { $this->buffer .= $data; - static $headerParsed = false; - - if ($headerParsed || false == strpos($this->buffer, "\r\n\r\n")) { - return $this->handleEx(); - } - - $headerParsed = true; - - $response = gPsr\parse_response($this->buffer); - - if ($this->responseIsAnUpgradeResponse($response)) { - $this->stream->removeListener('data', array($this, 'handleData')); - - $this->emit('upgrade', array($this->stream, $response, $this)); - return; - } - // buffer until double CRLF (or double LF for compatibility with legacy servers) - return $this->handleEx(); - } - - protected function handleEx() - { if (false !== strpos($this->buffer, "\r\n\r\n") || false !== strpos($this->buffer, "\n\n")) { try { - list($response, $bodyChunk) = $this->parseResponse($this->buffer); + $psrResponse = gPsr\parse_response($this->buffer); + + if ($this->responseIsAnUpgradeResponse($psrResponse)) { + $this->stream->removeListener('data', array($this, 'handleData')); + + $this->emit('upgrade', array($this->stream, $psrResponse, $this)); + return; + } + + list($response, $bodyChunk) = $this->parseResponse($psrResponse); } catch (\InvalidArgumentException $exception) { $this->emit('error', array($exception)); } @@ -277,9 +239,9 @@ public function close() $this->removeAllListeners(); } - protected function parseResponse($data) + protected function parseResponse($psrResponse) { - $psrResponse = gPsr\parse_response($data); +// $psrResponse = gPsr\parse_response($data); $headers = array_map(function($val) { if (1 === count($val)) { $val = $val[0]; diff --git a/src/Io/Sender.php b/src/Io/Sender.php index 204b5257..41a09ec7 100644 --- a/src/Io/Sender.php +++ b/src/Io/Sender.php @@ -86,9 +86,8 @@ public function send(RequestInterface $request, $upgrade = false) assert(\in_array($request->getProtocolVersion(), array('1.0', '1.1'), true)); $body = $request->getBody(); - $size = $body->getSize(); - $requestStream = $this->createRequestStream($request); + list($size, $requestStream) = $this->createRequestStream($request); $deferred = new Deferred(function ($_, $reject) use ($requestStream) { // close request stream if request is cancelled From da27836dfd5d83a52bc03622125a0716db5afbcf Mon Sep 17 00:00:00 2001 From: bosunski Date: Tue, 28 Jul 2020 02:06:56 +0100 Subject: [PATCH 13/17] Remove comment --- src/Client/Request.php | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Client/Request.php b/src/Client/Request.php index 1466ef66..520386dd 100644 --- a/src/Client/Request.php +++ b/src/Client/Request.php @@ -241,7 +241,6 @@ public function close() protected function parseResponse($psrResponse) { -// $psrResponse = gPsr\parse_response($data); $headers = array_map(function($val) { if (1 === count($val)) { $val = $val[0]; From 1c2e15e6add2ac98f1af1b87047d04faacd420bb Mon Sep 17 00:00:00 2001 From: Bosun Egberinde Date: Sun, 26 Feb 2023 00:28:40 +0100 Subject: [PATCH 14/17] Pull upstream changes --- src/Client/Request.php | 316 -------------------------------- src/Client/RequestData.php | 136 -------------- src/Client/UpgradedResponse.php | 2 +- src/Io/ClientRequestStream.php | 15 ++ src/Io/Sender.php | 2 +- 5 files changed, 17 insertions(+), 454 deletions(-) delete mode 100644 src/Client/Request.php delete mode 100644 src/Client/RequestData.php diff --git a/src/Client/Request.php b/src/Client/Request.php deleted file mode 100644 index 520386dd..00000000 --- a/src/Client/Request.php +++ /dev/null @@ -1,316 +0,0 @@ -connector = $connector; - $this->requestData = $requestData; - } - - public function isWritable() - { - return self::STATE_END > $this->state && !$this->ended; - } - - private function writeHead() - { - $this->state = self::STATE_WRITING_HEAD; - - $requestData = $this->requestData; - $streamRef = &$this->stream; - $stateRef = &$this->state; - $pendingWrites = &$this->pendingWrites; - $that = $this; - - $promise = $this->connect(); - $promise->then( - function (ConnectionInterface $stream) use ($requestData, &$streamRef, &$stateRef, &$pendingWrites, $that) { - $streamRef = $stream; - - $stream->on('drain', array($that, 'handleDrain')); - $stream->on('data', array($that, 'handleData')); - $stream->on('end', array($that, 'handleEnd')); - $stream->on('error', array($that, 'handleError')); - $stream->on('close', array($that, 'handleClose')); - - $headers = (string) $requestData; - - $more = $stream->write($headers . $pendingWrites); - - $stateRef = Request::STATE_HEAD_WRITTEN; - - // clear pending writes if non-empty - if ($pendingWrites !== '') { - $pendingWrites = ''; - - if ($more) { - $that->emit('drain'); - } - } - }, - array($this, 'closeError') - ); - - $this->on('close', function() use ($promise) { - $promise->cancel(); - }); - } - - protected function responseIsAnUpgradeResponse($response) - { - return - $response->hasHeader('Connection') && - (in_array('upgrade', array_map('strtolower', $response->getHeader('Connection')))) && - (int) $response->getStatusCode() === 101; - } - - public function write($data) - { - if (!$this->isWritable()) { - return false; - } - - // write directly to connection stream if already available - if (self::STATE_HEAD_WRITTEN <= $this->state) { - return $this->stream->write($data); - } - - // otherwise buffer and try to establish connection - $this->pendingWrites .= $data; - if (self::STATE_WRITING_HEAD > $this->state) { - $this->writeHead(); - } - - return false; - } - - public function end($data = null) - { - if (!$this->isWritable()) { - return; - } - - if (null !== $data) { - $this->write($data); - } else if (self::STATE_WRITING_HEAD > $this->state) { - $this->writeHead(); - } - - $this->ended = true; - } - - /** @internal */ - public function handleDrain() - { - $this->emit('drain'); - } - - /** @internal */ - public function handleData($data) - { - $this->buffer .= $data; - - // buffer until double CRLF (or double LF for compatibility with legacy servers) - if (false !== strpos($this->buffer, "\r\n\r\n") || false !== strpos($this->buffer, "\n\n")) { - try { - $psrResponse = gPsr\parse_response($this->buffer); - - if ($this->responseIsAnUpgradeResponse($psrResponse)) { - $this->stream->removeListener('data', array($this, 'handleData')); - - $this->emit('upgrade', array($this->stream, $psrResponse, $this)); - return; - } - - list($response, $bodyChunk) = $this->parseResponse($psrResponse); - } catch (\InvalidArgumentException $exception) { - $this->emit('error', array($exception)); - } - - $this->buffer = null; - - $this->stream->removeListener('drain', array($this, 'handleDrain')); - $this->stream->removeListener('data', array($this, 'handleData')); - $this->stream->removeListener('end', array($this, 'handleEnd')); - $this->stream->removeListener('error', array($this, 'handleError')); - $this->stream->removeListener('close', array($this, 'handleClose')); - - if (!isset($response)) { - return; - } - - $response->on('close', array($this, 'close')); - $that = $this; - $response->on('error', function (\Exception $error) use ($that) { - $that->closeError(new \RuntimeException( - "An error occured in the response", - 0, - $error - )); - }); - - $this->emit('response', array($response, $this)); - - $this->stream->emit('data', array($bodyChunk)); - } - } - - /** @internal */ - public function handleEnd() - { - $this->closeError(new \RuntimeException( - "Connection ended before receiving response" - )); - } - - /** @internal */ - public function handleError(\Exception $error) - { - $this->closeError(new \RuntimeException( - "An error occurred in the underlying stream", - 0, - $error - )); - } - - /** @internal */ - public function handleClose() - { - $this->close(); - } - - /** @internal */ - public function closeError(\Exception $error) - { - if (self::STATE_END <= $this->state) { - return; - } - $this->emit('error', array($error)); - $this->close(); - } - - public function close() - { - if (self::STATE_END <= $this->state) { - return; - } - - $this->state = self::STATE_END; - $this->pendingWrites = ''; - - if ($this->stream) { - $this->stream->close(); - } - - $this->emit('close'); - $this->removeAllListeners(); - } - - protected function parseResponse($psrResponse) - { - $headers = array_map(function($val) { - if (1 === count($val)) { - $val = $val[0]; - } - - return $val; - }, $psrResponse->getHeaders()); - - $factory = $this->getResponseFactory(); - - $response = $factory( - 'HTTP', - $psrResponse->getProtocolVersion(), - $psrResponse->getStatusCode(), - $psrResponse->getReasonPhrase(), - $headers - ); - - return array($response, (string)($psrResponse->getBody())); - } - - protected function connect() - { - $scheme = $this->requestData->getScheme(); - if ($scheme !== 'https' && $scheme !== 'http') { - return Promise\reject( - new \InvalidArgumentException('Invalid request URL given') - ); - } - - $host = $this->requestData->getHost(); - $port = $this->requestData->getPort(); - - if ($scheme === 'https') { - $host = 'tls://' . $host; - } - - return $this->connector - ->connect($host . ':' . $port); - } - - public function setResponseFactory($factory) - { - $this->responseFactory = $factory; - } - - public function getResponseFactory() - { - if (null === $factory = $this->responseFactory) { - $stream = $this->stream; - - $factory = function ($protocol, $version, $code, $reasonPhrase, $headers) use ($stream) { - return new Response( - $stream, - $protocol, - $version, - $code, - $reasonPhrase, - $headers - ); - }; - - $this->responseFactory = $factory; - } - - return $factory; - } - - public function getRequestData() - { - return $this->requestData; - } -} diff --git a/src/Client/RequestData.php b/src/Client/RequestData.php deleted file mode 100644 index c3b0f685..00000000 --- a/src/Client/RequestData.php +++ /dev/null @@ -1,136 +0,0 @@ -method = $method; - $this->url = $url; - $this->headers = $headers; - $this->protocolVersion = $protocolVersion; - } - - private function mergeDefaultheaders(array $headers) - { - $port = ($this->getDefaultPort() === $this->getPort()) ? '' : ":{$this->getPort()}"; - $connectionHeaders = ('1.1' === $this->protocolVersion) ? array('Connection' => 'close') : array(); - $authHeaders = $this->getAuthHeaders(); - - $defaults = array_merge( - array( - 'Host' => $this->getHost().$port, - 'User-Agent' => 'ReactPHP/1', - ), - $connectionHeaders, - $authHeaders - ); - - // remove all defaults that already exist in $headers - $lower = array_change_key_case($headers, CASE_LOWER); - foreach ($defaults as $key => $_) { - if (isset($lower[strtolower($key)])) { - unset($defaults[$key]); - } - } - - return array_merge($defaults, $headers); - } - - public function getScheme() - { - return parse_url($this->url, PHP_URL_SCHEME); - } - - public function getHost() - { - return parse_url($this->url, PHP_URL_HOST); - } - - public function getPort() - { - return (int) parse_url($this->url, PHP_URL_PORT) ?: $this->getDefaultPort(); - } - - public function getDefaultPort() - { - return ('https' === $this->getScheme()) ? 443 : 80; - } - - public function getPath() - { - $path = parse_url($this->url, PHP_URL_PATH); - $queryString = parse_url($this->url, PHP_URL_QUERY); - - // assume "/" path by default, but allow "OPTIONS *" - if ($path === null) { - $path = ($this->method === 'OPTIONS' && $queryString === null) ? '*': '/'; - } - if ($queryString !== null) { - $path .= '?' . $queryString; - } - - return $path; - } - - public function setProtocolVersion($version) - { - $this->protocolVersion = $version; - } - - public function __toString() - { - $headers = $this->mergeDefaultheaders($this->headers); - - $data = ''; - $data .= "{$this->method} {$this->getPath()} HTTP/{$this->protocolVersion}\r\n"; - foreach ($headers as $name => $values) { - foreach ((array)$values as $value) { - $data .= "$name: $value\r\n"; - } - } - $data .= "\r\n"; - - return $data; - } - - private function getUrlUserPass() - { - $components = parse_url($this->url); - - if (isset($components['user'])) { - return array( - 'user' => $components['user'], - 'pass' => isset($components['pass']) ? $components['pass'] : null, - ); - } - } - - private function getAuthHeaders() - { - if (null !== $auth = $this->getUrlUserPass()) { - return array( - 'Authorization' => 'Basic ' . base64_encode($auth['user'].':'.$auth['pass']), - ); - } - - return array(); - } - - /** - * @return array - */ - public function getHeaders() - { - return $this->headers; - } -} diff --git a/src/Client/UpgradedResponse.php b/src/Client/UpgradedResponse.php index 8659b304..fa259fe5 100644 --- a/src/Client/UpgradedResponse.php +++ b/src/Client/UpgradedResponse.php @@ -53,4 +53,4 @@ public function getRequest() { return $this->request; } -} \ No newline at end of file +} diff --git a/src/Io/ClientRequestStream.php b/src/Io/ClientRequestStream.php index 0220f008..91c761e7 100644 --- a/src/Io/ClientRequestStream.php +++ b/src/Io/ClientRequestStream.php @@ -156,6 +156,13 @@ public function handleData($data) try { $response = gPsr\parse_response($this->buffer); $bodyChunk = (string) $response->getBody(); + + if ($this->responseIsAnUpgradeResponse($response)) { + $this->connection->removeListener('data', array($this, 'handleData')); + + $this->emit('upgrade', array($this->connection, $response, $this)); + return; + } } catch (\InvalidArgumentException $exception) { $this->closeError($exception); return; @@ -216,6 +223,14 @@ public function handleData($data) } } + protected function responseIsAnUpgradeResponse($response) + { + return + $response->hasHeader('Connection') && + (in_array('upgrade', array_map('strtolower', $response->getHeader('Connection')))) && + (int) $response->getStatusCode() === 101; + } + /** @internal */ public function handleEnd() { diff --git a/src/Io/Sender.php b/src/Io/Sender.php index 41a09ec7..38296601 100644 --- a/src/Io/Sender.php +++ b/src/Io/Sender.php @@ -173,6 +173,6 @@ protected function createRequestStream(RequestInterface $request) $request = $request->withHeader('Authorization', 'Basic ' . \base64_encode($request->getUri()->getUserInfo())); } - return $this->http->request($request); + return array($size, $this->http->request($request)); } } From 73c5640827be81a6f18604874459be2f7d0e87ce Mon Sep 17 00:00:00 2001 From: Bosun Egberinde Date: Wed, 1 Mar 2023 23:47:51 +0100 Subject: [PATCH 15/17] Adjusted changes --- src/Browser.php | 3 +- src/Client/UpgradedResponse.php | 56 --------------------------- src/Io/ClientRequestStream.php | 14 +++---- src/Io/DuplexBodyStream.php | 39 +++++++++++++++++++ src/Io/Sender.php | 67 +++++++++++---------------------- src/Io/Transaction.php | 7 +--- 6 files changed, 70 insertions(+), 116 deletions(-) delete mode 100644 src/Client/UpgradedResponse.php create mode 100644 src/Io/DuplexBodyStream.php diff --git a/src/Browser.php b/src/Browser.php index f52ddffc..ad9187a6 100644 --- a/src/Browser.php +++ b/src/Browser.php @@ -10,7 +10,6 @@ use React\Http\Io\Transaction; use React\Http\Message\Request; use React\Promise\PromiseInterface; -use React\Socket\ConnectionInterface; use React\Socket\ConnectorInterface; use React\Stream\ReadableStreamInterface; use InvalidArgumentException; @@ -830,7 +829,7 @@ private function withOptions(array $options) * @param string $url * @param array $headers * @param string|ReadableStreamInterface $body - * @return PromiseInterface + * @return PromiseInterface */ private function requestMayBeStreaming($method, $url, array $headers = array(), $body = '') { diff --git a/src/Client/UpgradedResponse.php b/src/Client/UpgradedResponse.php deleted file mode 100644 index fa259fe5..00000000 --- a/src/Client/UpgradedResponse.php +++ /dev/null @@ -1,56 +0,0 @@ -connection = $connection; - $this->response = $response; - $this->request = $request; - } - - /** - * @return DuplexStreamInterface - */ - public function getConnection() - { - return $this->connection; - } - - /** - * @return ResponseInterface - */ - public function getResponse() - { - return $this->response; - } - - /** - * @return RequestInterface - */ - public function getRequest() - { - return $this->request; - } -} diff --git a/src/Io/ClientRequestStream.php b/src/Io/ClientRequestStream.php index 91c761e7..7181b46e 100644 --- a/src/Io/ClientRequestStream.php +++ b/src/Io/ClientRequestStream.php @@ -156,13 +156,6 @@ public function handleData($data) try { $response = gPsr\parse_response($this->buffer); $bodyChunk = (string) $response->getBody(); - - if ($this->responseIsAnUpgradeResponse($response)) { - $this->connection->removeListener('data', array($this, 'handleData')); - - $this->emit('upgrade', array($this->connection, $response, $this)); - return; - } } catch (\InvalidArgumentException $exception) { $this->closeError($exception); return; @@ -206,7 +199,12 @@ public function handleData($data) } elseif ($response->hasHeader('Content-Length')) { $length = (int) $response->getHeaderLine('Content-Length'); } - $response = $response->withBody($body = new ReadableBodyStream($body, $length)); + + $body = $this->responseIsAnUpgradeResponse($response) + ? new DuplexBodyStream($connection) + : new ReadableBodyStream($body, $length); + + $response = $response->withBody($body); $body->on('end', function () use (&$successfulEndReceived) { $successfulEndReceived = true; }); diff --git a/src/Io/DuplexBodyStream.php b/src/Io/DuplexBodyStream.php new file mode 100644 index 00000000..f7899dea --- /dev/null +++ b/src/Io/DuplexBodyStream.php @@ -0,0 +1,39 @@ +connection = $connection; + parent::__construct($connection); + } + + public function isWritable() + { + return $this->connection->isWritable(); + } + + public function write($data) + { + return $this->connection->write($data); + } + + public function end($data = null) + { + return $this->connection->end($data); + } + + public function close() + { + return $this->connection->close(); + } +} diff --git a/src/Io/Sender.php b/src/Io/Sender.php index 38296601..3598d31a 100644 --- a/src/Io/Sender.php +++ b/src/Io/Sender.php @@ -6,13 +6,10 @@ use Psr\Http\Message\ResponseInterface; use React\EventLoop\LoopInterface; use React\Http\Client\Client as HttpClient; -use React\Http\Client\UpgradedResponse; use React\Promise\PromiseInterface; use React\Promise\Deferred; use React\Socket\Connector; -use React\Socket\ConnectionInterface; use React\Socket\ConnectorInterface; -use React\Stream\DuplexStreamInterface; use React\Stream\ReadableStreamInterface; /** @@ -77,17 +74,36 @@ public function __construct(HttpClient $http) * * @internal * @param RequestInterface $request - * @param bool $upgrade Configures the sender to listen for upgrade - * @return PromiseInterface Promise + * @return PromiseInterface Promise */ - public function send(RequestInterface $request, $upgrade = false) + public function send(RequestInterface $request) { // support HTTP/1.1 and HTTP/1.0 only, ensured by `Browser` already assert(\in_array($request->getProtocolVersion(), array('1.0', '1.1'), true)); $body = $request->getBody(); + $size = $body->getSize(); + + if ($size !== null && $size !== 0) { + // automatically assign a "Content-Length" request header if the body size is known and non-empty + $request = $request->withHeader('Content-Length', (string)$size); + } elseif ($size === 0 && \in_array($request->getMethod(), array('POST', 'PUT', 'PATCH'))) { + // only assign a "Content-Length: 0" request header if the body is expected for certain methods + $request = $request->withHeader('Content-Length', '0'); + } elseif ($body instanceof ReadableStreamInterface && $body->isReadable() && !$request->hasHeader('Content-Length')) { + // use "Transfer-Encoding: chunked" when this is a streaming body and body size is unknown + $request = $request->withHeader('Transfer-Encoding', 'chunked'); + } else { + // do not use chunked encoding if size is known or if this is an empty request body + $size = 0; + } + + // automatically add `Authorization: Basic …` request header if URL includes `user:pass@host` + if ($request->getUri()->getUserInfo() !== '' && !$request->hasHeader('Authorization')) { + $request = $request->withHeader('Authorization', 'Basic ' . \base64_encode($request->getUri()->getUserInfo())); + } - list($size, $requestStream) = $this->createRequestStream($request); + $requestStream = $this->http->request($request); $deferred = new Deferred(function ($_, $reject) use ($requestStream) { // close request stream if request is cancelled @@ -103,16 +119,6 @@ public function send(RequestInterface $request, $upgrade = false) $deferred->resolve($response); }); - /** - * We listen for an upgrade, if the request was upgraded, we will hijack it and resolve immediately - * This is useful for websocket connections that requires an HTTP Upgrade. - */ - if ($upgrade) { - $requestStream->on('upgrade', function (DuplexStreamInterface $socket, ResponseInterface $response) use ($request, $deferred) { - $deferred->resolve(new UpgradedResponse($socket, $response, $request)); - }); - } - if ($body instanceof ReadableStreamInterface) { if ($body->isReadable()) { // length unknown => apply chunked transfer-encoding @@ -148,31 +154,4 @@ public function send(RequestInterface $request, $upgrade = false) return $deferred->promise(); } - - protected function createRequestStream(RequestInterface $request) - { - $body = $request->getBody(); - $size = $body->getSize(); - - if ($size !== null && $size !== 0) { - // automatically assign a "Content-Length" request header if the body size is known and non-empty - $request = $request->withHeader('Content-Length', (string)$size); - } elseif ($size === 0 && \in_array($request->getMethod(), array('POST', 'PUT', 'PATCH'))) { - // only assign a "Content-Length: 0" request header if the body is expected for certain methods - $request = $request->withHeader('Content-Length', '0'); - } elseif ($body instanceof ReadableStreamInterface && $body->isReadable() && !$request->hasHeader('Content-Length')) { - // use "Transfer-Encoding: chunked" when this is a streaming body and body size is unknown - $request = $request->withHeader('Transfer-Encoding', 'chunked'); - } else { - // do not use chunked encoding if size is known or if this is an empty request body - $size = 0; - } - - // automatically add `Authorization: Basic …` request header if URL includes `user:pass@host` - if ($request->getUri()->getUserInfo() !== '' && !$request->hasHeader('Authorization')) { - $request = $request->withHeader('Authorization', 'Basic ' . \base64_encode($request->getUri()->getUserInfo())); - } - - return array($size, $this->http->request($request)); - } } diff --git a/src/Io/Transaction.php b/src/Io/Transaction.php index b8f15ae7..b93c490c 100644 --- a/src/Io/Transaction.php +++ b/src/Io/Transaction.php @@ -36,9 +36,6 @@ class Transaction private $streaming = false; - // Determines whether to return the connection of an upgrade or not - private $upgrade = false; - private $maximumSize = 16777216; // 16 MiB = 2^24 bytes public function __construct(Sender $sender, LoopInterface $loop) @@ -146,9 +143,7 @@ private function next(RequestInterface $request, Deferred $deferred, ClientReque $that = $this; ++$state->numRequests; - $promise = $this->sender->send($request, $this->upgrade); - - if ($this->upgrade) return $promise; + $promise = $this->sender->send($request); if (!$this->streaming) { $promise = $promise->then(function ($response) use ($deferred, $state, $that) { From 7459d709dd7ed059036dcec090e01fcb5c87bd28 Mon Sep 17 00:00:00 2001 From: Bosun Egberinde Date: Fri, 3 Mar 2023 19:36:06 +0100 Subject: [PATCH 16/17] Update client --- src/Io/ClientRequestStream.php | 4 +++- src/Io/DuplexBodyStream.php | 9 ++------- src/Io/Transaction.php | 9 +++++++-- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/Io/ClientRequestStream.php b/src/Io/ClientRequestStream.php index 7181b46e..61305c24 100644 --- a/src/Io/ClientRequestStream.php +++ b/src/Io/ClientRequestStream.php @@ -183,7 +183,9 @@ public function handleData($data) if ($successfulEndReceived && $connection->isReadable() && $that->hasMessageKeepAliveEnabled($response) && $that->hasMessageKeepAliveEnabled($request)) { $connectionManager->keepAlive($request->getUri(), $connection); } else { - $connection->close(); + if (! $that->responseIsAnUpgradeResponse($response)) { + $connection->close(); + } } $that->close(); diff --git a/src/Io/DuplexBodyStream.php b/src/Io/DuplexBodyStream.php index f7899dea..254bf805 100644 --- a/src/Io/DuplexBodyStream.php +++ b/src/Io/DuplexBodyStream.php @@ -3,12 +3,12 @@ namespace React\Http\Io; use React\Socket\ConnectionInterface; -use React\Stream\WritableStreamInterface; +use React\Stream\DuplexStreamInterface; /** * @internal */ -class DuplexBodyStream extends ReadableBodyStream implements WritableStreamInterface +class DuplexBodyStream extends ReadableBodyStream implements DuplexStreamInterface { private $connection; public function __construct(ConnectionInterface $connection) @@ -31,9 +31,4 @@ public function end($data = null) { return $this->connection->end($data); } - - public function close() - { - return $this->connection->close(); - } } diff --git a/src/Io/Transaction.php b/src/Io/Transaction.php index b93c490c..e1250221 100644 --- a/src/Io/Transaction.php +++ b/src/Io/Transaction.php @@ -244,8 +244,8 @@ public function onResponse(ResponseInterface $response, RequestInterface $reques return $this->onResponseRedirect($response, $request, $deferred, $state); } - // only status codes 200-399 are considered to be valid, reject otherwise - if ($this->obeySuccessCode && ($response->getStatusCode() < 200 || $response->getStatusCode() >= 400)) { + // only status codes 100-399 are considered to be valid, reject otherwise + if ($this->obeySuccessCode && $this->failed($response)) { throw new ResponseException($response); } @@ -253,6 +253,11 @@ public function onResponse(ResponseInterface $response, RequestInterface $reques return $response; } + private function failed(ResponseInterface $response) + { + return $response->getStatusCode() >= 500 || ($response->getStatusCode() >= 400 && $response->getStatusCode() < 500); + } + /** * @param ResponseInterface $response * @param RequestInterface $request From 697df67b0ca8c856e1847d93fef7736ea6302c76 Mon Sep 17 00:00:00 2001 From: Bosun Egberinde Date: Sat, 4 Mar 2023 14:20:07 +0100 Subject: [PATCH 17/17] Add more context to ResponseException --- src/Io/Transaction.php | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/Io/Transaction.php b/src/Io/Transaction.php index e1250221..145409af 100644 --- a/src/Io/Transaction.php +++ b/src/Io/Transaction.php @@ -246,7 +246,11 @@ public function onResponse(ResponseInterface $response, RequestInterface $reques // only status codes 100-399 are considered to be valid, reject otherwise if ($this->obeySuccessCode && $this->failed($response)) { - throw new ResponseException($response); + $requestString = \RingCentral\Psr7\str($request); + $message = 'HTTP status code ' . $response->getStatusCode() . ' (' . $response->getReasonPhrase() . ')'; + $message .= ": {$request->getUri()}\n{$requestString}"; + + throw new ResponseException($response, $message); } // resolve our initial promise