Skip to content
This repository was archived by the owner on May 10, 2024. It is now read-only.

Commit 97ed9a4

Browse files
committed
Adjusted changes
1 parent 1c2e15e commit 97ed9a4

File tree

5 files changed

+69
-114
lines changed

5 files changed

+69
-114
lines changed

src/Client/UpgradedResponse.php

-56
This file was deleted.

src/Io/ClientRequestStream.php

+6-8
Original file line numberDiff line numberDiff line change
@@ -156,13 +156,6 @@ public function handleData($data)
156156
try {
157157
$response = gPsr\parse_response($this->buffer);
158158
$bodyChunk = (string) $response->getBody();
159-
160-
if ($this->responseIsAnUpgradeResponse($response)) {
161-
$this->connection->removeListener('data', array($this, 'handleData'));
162-
163-
$this->emit('upgrade', array($this->connection, $response, $this));
164-
return;
165-
}
166159
} catch (\InvalidArgumentException $exception) {
167160
$this->closeError($exception);
168161
return;
@@ -206,7 +199,12 @@ public function handleData($data)
206199
} elseif ($response->hasHeader('Content-Length')) {
207200
$length = (int) $response->getHeaderLine('Content-Length');
208201
}
209-
$response = $response->withBody($body = new ReadableBodyStream($body, $length));
202+
203+
$body = $this->responseIsAnUpgradeResponse($response)
204+
? new DuplexBodyStream($connection)
205+
: new ReadableBodyStream($body, $length);
206+
207+
$response = $response->withBody($body);
210208
$body->on('end', function () use (&$successfulEndReceived) {
211209
$successfulEndReceived = true;
212210
});

src/Io/DuplexBodyStream.php

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?php
2+
3+
namespace React\Http\Io;
4+
5+
use React\Socket\ConnectionInterface;
6+
use React\Stream\WritableStreamInterface;
7+
8+
/**
9+
* @internal
10+
*/
11+
class DuplexBodyStream extends ReadableBodyStream implements WritableStreamInterface
12+
{
13+
private $connection;
14+
public function __construct(ConnectionInterface $connection)
15+
{
16+
$this->connection = $connection;
17+
parent::__construct($connection);
18+
}
19+
20+
public function isWritable()
21+
{
22+
return $this->connection->isWritable();
23+
}
24+
25+
public function write($data)
26+
{
27+
return $this->connection->write($data);
28+
}
29+
30+
public function end($data = null)
31+
{
32+
return $this->connection->end($data);
33+
}
34+
35+
public function close()
36+
{
37+
return $this->connection->close();
38+
}
39+
}

src/Io/Sender.php

+23-44
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,10 @@
66
use Psr\Http\Message\ResponseInterface;
77
use React\EventLoop\LoopInterface;
88
use React\Http\Client\Client as HttpClient;
9-
use React\Http\Client\UpgradedResponse;
109
use React\Promise\PromiseInterface;
1110
use React\Promise\Deferred;
1211
use React\Socket\Connector;
13-
use React\Socket\ConnectionInterface;
1412
use React\Socket\ConnectorInterface;
15-
use React\Stream\DuplexStreamInterface;
1613
use React\Stream\ReadableStreamInterface;
1714

1815
/**
@@ -77,17 +74,36 @@ public function __construct(HttpClient $http)
7774
*
7875
* @internal
7976
* @param RequestInterface $request
80-
* @param bool $upgrade Configures the sender to listen for upgrade
81-
* @return PromiseInterface Promise<ResponseInterface, ConnectionInterface, Exception>
77+
* @return PromiseInterface Promise<ResponseInterface, Exception>
8278
*/
83-
public function send(RequestInterface $request, $upgrade = false)
79+
public function send(RequestInterface $request)
8480
{
8581
// support HTTP/1.1 and HTTP/1.0 only, ensured by `Browser` already
8682
assert(\in_array($request->getProtocolVersion(), array('1.0', '1.1'), true));
8783

8884
$body = $request->getBody();
85+
$size = $body->getSize();
86+
87+
if ($size !== null && $size !== 0) {
88+
// automatically assign a "Content-Length" request header if the body size is known and non-empty
89+
$request = $request->withHeader('Content-Length', (string)$size);
90+
} elseif ($size === 0 && \in_array($request->getMethod(), array('POST', 'PUT', 'PATCH'))) {
91+
// only assign a "Content-Length: 0" request header if the body is expected for certain methods
92+
$request = $request->withHeader('Content-Length', '0');
93+
} elseif ($body instanceof ReadableStreamInterface && $body->isReadable() && !$request->hasHeader('Content-Length')) {
94+
// use "Transfer-Encoding: chunked" when this is a streaming body and body size is unknown
95+
$request = $request->withHeader('Transfer-Encoding', 'chunked');
96+
} else {
97+
// do not use chunked encoding if size is known or if this is an empty request body
98+
$size = 0;
99+
}
100+
101+
// automatically add `Authorization: Basic …` request header if URL includes `user:pass@host`
102+
if ($request->getUri()->getUserInfo() !== '' && !$request->hasHeader('Authorization')) {
103+
$request = $request->withHeader('Authorization', 'Basic ' . \base64_encode($request->getUri()->getUserInfo()));
104+
}
89105

90-
list($size, $requestStream) = $this->createRequestStream($request);
106+
$requestStream = $this->http->request($request);
91107

92108
$deferred = new Deferred(function ($_, $reject) use ($requestStream) {
93109
// close request stream if request is cancelled
@@ -103,16 +119,6 @@ public function send(RequestInterface $request, $upgrade = false)
103119
$deferred->resolve($response);
104120
});
105121

106-
/**
107-
* We listen for an upgrade, if the request was upgraded, we will hijack it and resolve immediately
108-
* This is useful for websocket connections that requires an HTTP Upgrade.
109-
*/
110-
if ($upgrade) {
111-
$requestStream->on('upgrade', function (DuplexStreamInterface $socket, ResponseInterface $response) use ($request, $deferred) {
112-
$deferred->resolve(new UpgradedResponse($socket, $response, $request));
113-
});
114-
}
115-
116122
if ($body instanceof ReadableStreamInterface) {
117123
if ($body->isReadable()) {
118124
// length unknown => apply chunked transfer-encoding
@@ -148,31 +154,4 @@ public function send(RequestInterface $request, $upgrade = false)
148154

149155
return $deferred->promise();
150156
}
151-
152-
protected function createRequestStream(RequestInterface $request)
153-
{
154-
$body = $request->getBody();
155-
$size = $body->getSize();
156-
157-
if ($size !== null && $size !== 0) {
158-
// automatically assign a "Content-Length" request header if the body size is known and non-empty
159-
$request = $request->withHeader('Content-Length', (string)$size);
160-
} elseif ($size === 0 && \in_array($request->getMethod(), array('POST', 'PUT', 'PATCH'))) {
161-
// only assign a "Content-Length: 0" request header if the body is expected for certain methods
162-
$request = $request->withHeader('Content-Length', '0');
163-
} elseif ($body instanceof ReadableStreamInterface && $body->isReadable() && !$request->hasHeader('Content-Length')) {
164-
// use "Transfer-Encoding: chunked" when this is a streaming body and body size is unknown
165-
$request = $request->withHeader('Transfer-Encoding', 'chunked');
166-
} else {
167-
// do not use chunked encoding if size is known or if this is an empty request body
168-
$size = 0;
169-
}
170-
171-
// automatically add `Authorization: Basic …` request header if URL includes `user:pass@host`
172-
if ($request->getUri()->getUserInfo() !== '' && !$request->hasHeader('Authorization')) {
173-
$request = $request->withHeader('Authorization', 'Basic ' . \base64_encode($request->getUri()->getUserInfo()));
174-
}
175-
176-
return array($size, $this->http->request($request));
177-
}
178157
}

src/Io/Transaction.php

+1-6
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,6 @@ class Transaction
3636

3737
private $streaming = false;
3838

39-
// Determines whether to return the connection of an upgrade or not
40-
private $upgrade = false;
41-
4239
private $maximumSize = 16777216; // 16 MiB = 2^24 bytes
4340

4441
public function __construct(Sender $sender, LoopInterface $loop)
@@ -146,9 +143,7 @@ private function next(RequestInterface $request, Deferred $deferred, ClientReque
146143
$that = $this;
147144
++$state->numRequests;
148145

149-
$promise = $this->sender->send($request, $this->upgrade);
150-
151-
if ($this->upgrade) return $promise;
146+
$promise = $this->sender->send($request);
152147

153148
if (!$this->streaming) {
154149
$promise = $promise->then(function ($response) use ($deferred, $state, $that) {

0 commit comments

Comments
 (0)